-
Notifications
You must be signed in to change notification settings - Fork 68
KRAM Module and Point Implementation for QRY messages #1070
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
b54dcf3
99c2b85
d6fdd53
86f31aa
80821e6
a7ef337
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,231 @@ | ||
| from keri import kering | ||
| from keri.help import helping | ||
|
|
||
| from typing import NamedTuple | ||
| from keri import help | ||
| from hio.base import Doer | ||
| import json | ||
|
|
||
| logger = help.ogler.getLogger() | ||
|
|
||
| # TODO: Implement message type as part of TimelinessCache module for more granular control | ||
| class MessageType(NamedTuple): | ||
| """Named tuple for KERI message types, not yet used.""" | ||
| QRY: str = "qry" | ||
| RPY: str = "rpy" | ||
| PRO: str = "pro" | ||
| BAR: str = "bar" | ||
| EXN: str = "exn" | ||
|
|
||
|
|
||
| MESSAGE_TYPES = MessageType() | ||
|
|
||
| class TimelinessCache: | ||
| """TimelinessCache is responsible for preventing replay attacks in KERI/KRAM by ensuring | ||
| messages are timely and processed in a strictly monotonically ordered fashion. | ||
|
|
||
| It maintains: | ||
| 1. A Lagging Window Size Table - to determine validity windows for different message types | ||
| 2. A Replay Cache Table - to store timestamps of previously validated messages | ||
| """ | ||
|
|
||
| def __init__(self, db, defaultWindowSize=3.0, defaultDriftSkew=1.0): | ||
| """Initialize the TimelinessCache. | ||
|
|
||
| Parameters: | ||
| db: Database instance that contains the IoSetSuber at db.time | ||
| defaultWindowSize (float): Default window size in seconds | ||
| defaultDriftSkew (float): Default drift skew in seconds | ||
| """ | ||
| self.defaultWindowSize = defaultWindowSize | ||
| self.defaultDriftSkew = defaultDriftSkew | ||
|
|
||
| self.db = db | ||
|
|
||
| def setWindowParameters(self, aid, windowSize=None, driftSkew=None, messageType=None): | ||
| """Set window parameters for given autonomic identifier and message type. | ||
| """ | ||
| # TODO: Implement message type as part of the window parameters | ||
| windowSize = windowSize or self.defaultWindowSize | ||
| driftSkew = driftSkew or self.defaultDriftSkew | ||
|
|
||
| # Serialize the tuple as JSON bytes for storage | ||
| windowTuple = (windowSize, driftSkew) | ||
| serialized = json.dumps(windowTuple).encode('utf-8') | ||
|
|
||
| self.db.kram.pin(aid, [serialized]) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes! that is the plan, currently untested. |
||
|
|
||
|
|
||
| def getWindowParameters(self, aid, messageType=None): | ||
| """Get window parameters for given autonomic identifier and message type. | ||
| Falls back to default values if no entry exists for the aid. | ||
|
|
||
| Parameters: | ||
| aid (str): autonomic identifier | ||
| messageType (str | None): message type identifier. None for now, but will | ||
| be used to determine the window size for a given message type | ||
|
|
||
| Returns: | ||
| tuple: (windowSize, driftSkew) as floats in seconds | ||
| """ | ||
| # TODO: Implement message type as part of the window parameters | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could accomplish this with a key of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's actually how I've done it in my working copy! Haven't written tests for this feature yet. |
||
| try: | ||
| # Try to get the stored parameters | ||
| storedData = self.db.kram.getLast(aid) | ||
| if storedData is not None: | ||
| # Deserialize from JSON bytes | ||
| windowTuple = json.loads(storedData.decode('utf-8')) | ||
| return windowTuple[0], windowTuple[1] | ||
| except Exception: | ||
| pass | ||
|
|
||
| # Fallback to defaults | ||
| return self.defaultWindowSize, self.defaultDriftSkew | ||
|
|
||
|
|
||
| def _constructCacheKey(self, serder): | ||
| """Construct the key for the Replay Cache Table. | ||
|
|
||
| Parameters: | ||
| serder: The SerderKERI instance containing the message | ||
|
|
||
| Returns: | ||
| str: The key for the Replay Cache Table | ||
| """ | ||
| # TODO: Implement message type as part of the key (serder.ilk) | ||
| sad = serder.sad | ||
| sourceAid = sad.get("i", "") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you do |
||
|
|
||
| return sourceAid | ||
|
|
||
| def _getCachedTimestamp(self, key): | ||
| """Get the cached timestamp for a key from the Replay Cache Table. | ||
|
|
||
| Parameters: | ||
| key (str): The cache key | ||
|
|
||
| Returns: | ||
| float | None: The cached timestamp in seconds or None if not found | ||
| """ | ||
| try: | ||
| storedTimestamp = self.db.time.getLast(key) | ||
| if storedTimestamp is not None: | ||
| return float(storedTimestamp) | ||
| except Exception as e: | ||
| print(e) | ||
| pass | ||
| return None | ||
|
|
||
| def _storeTimestamp(self, key, timestamp): | ||
| """Store a timestamp in the Replay Cache Table. | ||
|
|
||
| Parameters: | ||
| key (str): The cache key | ||
| timestamp (float): The timestamp to store in seconds | ||
| """ | ||
| timestampStr = str(timestamp) | ||
|
|
||
| self.db.time.pin(key, [timestampStr]) | ||
|
|
||
| def checkMessageTimeliness(self, serder): | ||
| """Check if a message is timely and not a replay. | ||
|
|
||
| Parameters: | ||
| serder: The Serder instance containing the message | ||
|
|
||
| Returns: | ||
| tuple: (isValid, reason) where: | ||
| isValid (bool): True if the message is timely and not a replay | ||
| reason (str): A description of why the message was accepted or rejected | ||
| """ | ||
| if not serder.verify(): | ||
| return False, "Invalid message structure" | ||
|
|
||
| sad = serder.sad | ||
|
|
||
| sourceAid = sad.get("i", None) | ||
| messageType = serder.ilk or None | ||
| timestamp = sad.get("dt", None) | ||
|
|
||
| if not all([sourceAid, messageType, timestamp]): | ||
| return False, "Missing required message fields" | ||
|
|
||
| windowSize, driftSkew = self.getWindowParameters(sourceAid, messageType) | ||
|
|
||
| # Convert both timestamps to seconds since epoch for comparison | ||
| currentTime = helping.fromIso8601(helping.nowIso8601()).timestamp() | ||
|
|
||
| messageTime = helping.fromIso8601(timestamp).timestamp() | ||
|
|
||
| if (messageTime < currentTime - driftSkew - windowSize or | ||
| messageTime > currentTime + driftSkew): | ||
| raise kering.KramError(f"Message is out of time window {serder.pretty()}") | ||
|
|
||
| cacheKey = self._constructCacheKey(serder) | ||
|
|
||
| cachedTimestamp = self._getCachedTimestamp(cacheKey) | ||
|
|
||
| if cachedTimestamp is None: | ||
| self._storeTimestamp(cacheKey, messageTime) | ||
| # Message accepted, new entry | ||
| return True | ||
|
|
||
| if messageTime > cachedTimestamp: | ||
| self._storeTimestamp(cacheKey, messageTime) | ||
| # Message accepted, updated cached entry | ||
| return True | ||
|
|
||
| if messageTime == cachedTimestamp: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this too strict? As in, if any two messages have the same timestamp should they really be considered a replay? It would seem comparing the message digests in addition to the timestamp is also necessary to determine something is a replay.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is if they have the same timestamp they should be treated as a replay by the time they are validated by the kraming module. The KRAM white paper describes the cache as monotonically ordered, which seems to imply that this is the appropriate level of strictness. Open to corrections/discussion here. |
||
| raise kering.KramError(f"Message replay detected {serder.pretty()}") | ||
|
|
||
| raise kering.KramError(f"Message is out of order {serder.pretty()}") | ||
|
|
||
| def pruneCache(self): | ||
| """Prune stale entries from the Replay Cache Table. | ||
|
|
||
| Returns: | ||
| int: The number of pruned entries | ||
| """ | ||
| prunedCount = 0 | ||
| currentTime = helping.fromIso8601(helping.nowIso8601()).timestamp() | ||
|
|
||
| for key, timestampStr in self.db.time.getItemIter(): | ||
| try: | ||
|
|
||
| timestamp = float(timestampStr) | ||
|
|
||
| windowSize, driftSkew = self.getWindowParameters(key) | ||
|
|
||
| if timestamp < currentTime - driftSkew - windowSize: | ||
| self.db.time.rem(key) | ||
| prunedCount += 1 | ||
|
|
||
| except Exception: | ||
| continue | ||
|
|
||
| return prunedCount | ||
|
|
||
| def processKrms(self): | ||
| for said, serder in self.db.krms.getFullItemIter(): | ||
| try: | ||
| # assumes serder is a SerderKERI, more processing may be needed | ||
| if self.checkMessageTimeliness(serder): | ||
| # TODO: Implement escrowing functionality | ||
| self.db.krms.rem(said) | ||
| logger.info(f"Message accepted: {serder.pretty()}") | ||
| except kering.KramError as e: | ||
| logger.error(f"Invalid message: {e}") | ||
| self.pruneCache() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this Another way of saying it is should prune cache only occur when there are items in the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implemented it this way because it seemed like unnecessary overhead to always be pruning the database. Open to discussing potential tradeoffs. |
||
|
|
||
| class KramDoer(Doer): | ||
| """KramDoer is a Doer that manages the KRAM database.""" | ||
| def __init__(self, db): | ||
| self.db = db | ||
|
|
||
| self.tc = TimelinessCache(self.db) | ||
|
|
||
| super(KramDoer, self).__init__() | ||
|
|
||
| def recur(self, tyme): | ||
| # TODO: Implement KRAM escrowing functionality | ||
| self.tc.processKrms() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ | |
| from base64 import urlsafe_b64decode as decodeB64 | ||
| from base64 import urlsafe_b64encode as encodeB64 | ||
|
|
||
| from .kraming import TimelinessCache | ||
| from .. import kering | ||
| from ..kering import (Colds, sniff, Vrsn_1_0, Vrsn_2_0, | ||
| ShortageError, ColdStartError) | ||
|
|
@@ -132,7 +133,7 @@ class Parser: | |
|
|
||
| def __init__(self, ims=None, framed=True, piped=False, kvy=None, | ||
| tvy=None, exc=None, rvy=None, vry=None, local=False, | ||
| version=Vrsn_2_0): | ||
| krm=None, version=Vrsn_2_0): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it make sense to integrate this at the parser level? While consistent with how we have coupled many other concerns in the Parsing class should we be separating out processing from applying message policies like this? I wonder if we are putting too many responsibilities in the parser and whether it would make more sense to have the parser just output parsed, hydrated (instantiated as classes) CESR primitives than then go into a message processing pipeline to route them appropriately. This is definitely doable within the parser, yet it seems like it should happen after something is returned from the parser.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Full implementation plan is to have a KRAM escrow since point KRAM only works for non-trans aids. Open to discussion, just worth considering that the current parsing integration will probably be replaced with something more robust and non-interdependent. |
||
| """ | ||
| Initialize instance: | ||
|
|
||
|
|
@@ -166,6 +167,12 @@ def __init__(self, ims=None, framed=True, piped=False, kvy=None, | |
| self.version = version # provided version may be earlier than supported version | ||
| # version sets .methods, .codes, .sucodes, and .mucodes | ||
|
|
||
| # default timeliness cache / KRAM behavior | ||
| self.krm = krm | ||
| if not krm and self.kvy and self.kvy.db: | ||
| self.krm = TimelinessCache(self.kvy.db) | ||
|
|
||
|
|
||
|
|
||
| @property | ||
| def genus(self): | ||
|
|
@@ -1196,6 +1203,7 @@ def msgParsator(self, ims=None, framed=True, piped=False, | |
|
|
||
| elif ilk in (Ilks.qry,): # query message | ||
| # ToDo neigher kvy.processQuery nor tvy.processQuery actually verify | ||
| self.krm.checkMessageTimeliness(serder) | ||
| if exts['ssgs']: | ||
| # use last one if more than one | ||
| pre, sigers = exts['ssgs'][-1] if exts['ssgs'] else (None, None) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be serializing as JSON or as a set of CESR encoded floats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not opposed to changing it to fit whatever the best practice is; there other places in the codebase where we serialize this way when not handling KERI event data.