diff --git a/src/keri/app/habbing.py b/src/keri/app/habbing.py index f6aa72751..6f5afee7f 100644 --- a/src/keri/app/habbing.py +++ b/src/keri/app/habbing.py @@ -293,8 +293,8 @@ def setup(self, *, seed=None, aeid=None, bran=None, pidx=None, algo=None, raise ValueError(f"Bran (passcode seed material) too short.") bran = coring.MtrDex.Salt_128 + 'A' + bran[:21] # qb64 salt for seed signer = core.Salter(qb64=bran).signer(transferable=False, - tier=tier, - temp=temp) + tier=tier, + temp=temp) seed = signer.qb64 if not aeid: # aeid must not be empty event on initial creation aeid = signer.verfer.qb64 # lest it remove encryption diff --git a/src/keri/core/kraming.py b/src/keri/core/kraming.py new file mode 100644 index 000000000..bf4a0f9a7 --- /dev/null +++ b/src/keri/core/kraming.py @@ -0,0 +1,231 @@ +from keri import kering +from keri.help import helping + +from typing import NamedTuple +from keri import help +from hio.base import Doer +import json + +logger = help.ogler.getLogger() + +# TODO: Implement message type as part of TimelinessCache module for more granular control +class MessageType(NamedTuple): + """Named tuple for KERI message types, not yet used.""" + QRY: str = "qry" + RPY: str = "rpy" + PRO: str = "pro" + BAR: str = "bar" + EXN: str = "exn" + + +MESSAGE_TYPES = MessageType() + +class TimelinessCache: + """TimelinessCache is responsible for preventing replay attacks in KERI/KRAM by ensuring + messages are timely and processed in a strictly monotonically ordered fashion. + + It maintains: + 1. A Lagging Window Size Table - to determine validity windows for different message types + 2. A Replay Cache Table - to store timestamps of previously validated messages + """ + + def __init__(self, db, defaultWindowSize=3.0, defaultDriftSkew=1.0): + """Initialize the TimelinessCache. + + Parameters: + db: Database instance that contains the IoSetSuber at db.time + defaultWindowSize (float): Default window size in seconds + defaultDriftSkew (float): Default drift skew in seconds + """ + self.defaultWindowSize = defaultWindowSize + self.defaultDriftSkew = defaultDriftSkew + + self.db = db + + def setWindowParameters(self, aid, windowSize=None, driftSkew=None, messageType=None): + """Set window parameters for given autonomic identifier and message type. + """ + # TODO: Implement message type as part of the window parameters + windowSize = windowSize or self.defaultWindowSize + driftSkew = driftSkew or self.defaultDriftSkew + + # Serialize the tuple as JSON bytes for storage + windowTuple = (windowSize, driftSkew) + serialized = json.dumps(windowTuple).encode('utf-8') + + self.db.kram.pin(aid, [serialized]) + + + def getWindowParameters(self, aid, messageType=None): + """Get window parameters for given autonomic identifier and message type. + Falls back to default values if no entry exists for the aid. + + Parameters: + aid (str): autonomic identifier + messageType (str | None): message type identifier. None for now, but will + be used to determine the window size for a given message type + + Returns: + tuple: (windowSize, driftSkew) as floats in seconds + """ + # TODO: Implement message type as part of the window parameters + try: + # Try to get the stored parameters + storedData = self.db.kram.getLast(aid) + if storedData is not None: + # Deserialize from JSON bytes + windowTuple = json.loads(storedData.decode('utf-8')) + return windowTuple[0], windowTuple[1] + except Exception: + pass + + # Fallback to defaults + return self.defaultWindowSize, self.defaultDriftSkew + + + def _constructCacheKey(self, serder): + """Construct the key for the Replay Cache Table. + + Parameters: + serder: The SerderKERI instance containing the message + + Returns: + str: The key for the Replay Cache Table + """ + # TODO: Implement message type as part of the key (serder.ilk) + sad = serder.sad + sourceAid = sad.get("i", "") + + return sourceAid + + def _getCachedTimestamp(self, key): + """Get the cached timestamp for a key from the Replay Cache Table. + + Parameters: + key (str): The cache key + + Returns: + float | None: The cached timestamp in seconds or None if not found + """ + try: + storedTimestamp = self.db.time.getLast(key) + if storedTimestamp is not None: + return float(storedTimestamp) + except Exception as e: + print(e) + pass + return None + + def _storeTimestamp(self, key, timestamp): + """Store a timestamp in the Replay Cache Table. + + Parameters: + key (str): The cache key + timestamp (float): The timestamp to store in seconds + """ + timestampStr = str(timestamp) + + self.db.time.pin(key, [timestampStr]) + + def checkMessageTimeliness(self, serder): + """Check if a message is timely and not a replay. + + Parameters: + serder: The Serder instance containing the message + + Returns: + tuple: (isValid, reason) where: + isValid (bool): True if the message is timely and not a replay + reason (str): A description of why the message was accepted or rejected + """ + if not serder.verify(): + return False, "Invalid message structure" + + sad = serder.sad + + sourceAid = sad.get("i", None) + messageType = serder.ilk or None + timestamp = sad.get("dt", None) + + if not all([sourceAid, messageType, timestamp]): + return False, "Missing required message fields" + + windowSize, driftSkew = self.getWindowParameters(sourceAid, messageType) + + # Convert both timestamps to seconds since epoch for comparison + currentTime = helping.fromIso8601(helping.nowIso8601()).timestamp() + + messageTime = helping.fromIso8601(timestamp).timestamp() + + if (messageTime < currentTime - driftSkew - windowSize or + messageTime > currentTime + driftSkew): + raise kering.KramError(f"Message is out of time window {serder.pretty()}") + + cacheKey = self._constructCacheKey(serder) + + cachedTimestamp = self._getCachedTimestamp(cacheKey) + + if cachedTimestamp is None: + self._storeTimestamp(cacheKey, messageTime) + # Message accepted, new entry + return True + + if messageTime > cachedTimestamp: + self._storeTimestamp(cacheKey, messageTime) + # Message accepted, updated cached entry + return True + + if messageTime == cachedTimestamp: + raise kering.KramError(f"Message replay detected {serder.pretty()}") + + raise kering.KramError(f"Message is out of order {serder.pretty()}") + + def pruneCache(self): + """Prune stale entries from the Replay Cache Table. + + Returns: + int: The number of pruned entries + """ + prunedCount = 0 + currentTime = helping.fromIso8601(helping.nowIso8601()).timestamp() + + for key, timestampStr in self.db.time.getItemIter(): + try: + + timestamp = float(timestampStr) + + windowSize, driftSkew = self.getWindowParameters(key) + + if timestamp < currentTime - driftSkew - windowSize: + self.db.time.rem(key) + prunedCount += 1 + + except Exception: + continue + + return prunedCount + + def processKrms(self): + for said, serder in self.db.krms.getFullItemIter(): + try: + # assumes serder is a SerderKERI, more processing may be needed + if self.checkMessageTimeliness(serder): + # TODO: Implement escrowing functionality + self.db.krms.rem(said) + logger.info(f"Message accepted: {serder.pretty()}") + except kering.KramError as e: + logger.error(f"Invalid message: {e}") + self.pruneCache() + +class KramDoer(Doer): + """KramDoer is a Doer that manages the KRAM database.""" + def __init__(self, db): + self.db = db + + self.tc = TimelinessCache(self.db) + + super(KramDoer, self).__init__() + + def recur(self, tyme): + # TODO: Implement KRAM escrowing functionality + self.tc.processKrms() \ No newline at end of file diff --git a/src/keri/core/parsing.py b/src/keri/core/parsing.py index 65d39c03e..0c71d8ade 100644 --- a/src/keri/core/parsing.py +++ b/src/keri/core/parsing.py @@ -11,6 +11,7 @@ from base64 import urlsafe_b64decode as decodeB64 from base64 import urlsafe_b64encode as encodeB64 +from .kraming import TimelinessCache from .. import kering from ..kering import (Colds, sniff, Vrsn_1_0, Vrsn_2_0, ShortageError, ColdStartError) @@ -132,7 +133,7 @@ class Parser: def __init__(self, ims=None, framed=True, piped=False, kvy=None, tvy=None, exc=None, rvy=None, vry=None, local=False, - version=Vrsn_2_0): + krm=None, version=Vrsn_2_0): """ Initialize instance: @@ -166,6 +167,12 @@ def __init__(self, ims=None, framed=True, piped=False, kvy=None, self.version = version # provided version may be earlier than supported version # version sets .methods, .codes, .sucodes, and .mucodes + # default timeliness cache / KRAM behavior + self.krm = krm + if not krm and self.kvy and self.kvy.db: + self.krm = TimelinessCache(self.kvy.db) + + @property def genus(self): @@ -1196,6 +1203,7 @@ def msgParsator(self, ims=None, framed=True, piped=False, elif ilk in (Ilks.qry,): # query message # ToDo neigher kvy.processQuery nor tvy.processQuery actually verify + self.krm.checkMessageTimeliness(serder) if exts['ssgs']: # use last one if more than one pre, sigers = exts['ssgs'][-1] if exts['ssgs'] else (None, None) diff --git a/src/keri/db/basing.py b/src/keri/db/basing.py index 0188fd417..220d9916a 100644 --- a/src/keri/db/basing.py +++ b/src/keri/db/basing.py @@ -1304,6 +1304,20 @@ def reopen(self, **kwa): # TODO: clean self.maids = subing.CesrIoSetSuber(db=self, subkey="maids.", klas=coring.Prefixer) + # timeliness cache of identifiers (soon to also include message types) + self.time = subing.IoSetSuber(db=self, + subkey='time.', + sep=">") + + # Window sizes and drift skews for specific identifiers (soon to also include message types) + self.kram = subing.IoSetSuber(db=self, + subkey='kram.', + sep=">", + ) + + # KRAM escrow database, not yet implemented + self.krms = subing.IoSetSuber(db=self, subkey='krms.', sep=">") + self.reload() return self.env diff --git a/src/keri/kering.py b/src/keri/kering.py index ff1a7cb2c..68268d8ba 100644 --- a/src/keri/kering.py +++ b/src/keri/kering.py @@ -1041,3 +1041,9 @@ class QueryNotFoundError(KeriError): raise QueryNotFoundError("error message") """ +class KramError(KeriError): + """ + Timestamp indicating replay attack or other KRAM related error + Usage: + raise KramError("error message") + """ diff --git a/tests/conftest.py b/tests/conftest.py index 4013464ae..4f8f85932 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -82,6 +82,36 @@ def mockRandomSalt(self): def seeder(): return DbSeed +@pytest.fixture +def create_test_serder(): + """Fixture that provides a helper function to create test serders""" + + def _create_test_serder(sourceAid, ilk, timestamp=None, route=None, qBlock=None, routeParams=None): + """Helper to create a test serder with specified parameters""" + from keri.core import serdering + from keri.help import helping + + if timestamp is None: + timestamp = helping.nowIso8601() + + sad = { + "v": "KERI10JSON00011c_", + "t": ilk, + "i": sourceAid, + "dt": timestamp, + } + + if route: + if routeParams: + route += "?" + "&".join([f"{k}={v}" for k, v in routeParams.items()]) + sad["r"] = route + + if qBlock: + sad["q"] = qBlock + + return serdering.SerderKERI(sad=sad, makify=True) + + return _create_test_serder class DbSeed: @staticmethod diff --git a/tests/core/test_kraming.py b/tests/core/test_kraming.py new file mode 100644 index 000000000..80fe1a735 --- /dev/null +++ b/tests/core/test_kraming.py @@ -0,0 +1,56 @@ +import pytest +from keri.app import habbing +from keri.core import kraming, serdering +from keri.help import helping +from keri import kering + + +def test_timeliness(monkeypatch, create_test_serder): + + def mockNowIso8601(): + return "2021-06-27T21:26:21.233257+00:00" + + monkeypatch.setattr(helping, "nowIso8601", mockNowIso8601) + + with habbing.openHab(name="kramTest", base="test", salt=b'0123456789abcdeg') as (hby, hab): + + tc = kraming.TimelinessCache(db=hby.db, defaultDriftSkew=1.0, defaultWindowSize=1.0) + + assert tc.defaultWindowSize == 1.0 + assert tc.defaultDriftSkew == 1.0 + + # Set window parameters for the test AIDs + tc.setWindowParameters(hab.pre, windowSize=1.0, driftSkew=1.0) + + + current_time = helping.nowIso8601() + + offerSerder = create_test_serder( + sourceAid=hab.pre, + ilk="exn", + timestamp=current_time, + route="/credential/offer", + ) + + # Cache first entry + isValid = tc.checkMessageTimeliness(offerSerder) + assert isValid + + def mockNowIso8601Later(): + return "2021-06-27T21:26:24.233258+00:00" + + monkeypatch.setattr(helping, "nowIso8601", mockNowIso8601Later) + + offerSerder2 = create_test_serder(sourceAid="BGKVzj4ve0VSd8z_AmvhLg4lqcC_9WYX90k03q-R_Ydo", + ilk="exn", timestamp=mockNowIso8601Later(), route="/credential/offer") + + # Cache new entry + isValid = tc.checkMessageTimeliness(offerSerder2) + assert isValid + + # Attempt to cache first entry again - this should now raise ValidationError + with pytest.raises(kering.KramError): + tc.checkMessageTimeliness(offerSerder) + + # Prune only the entry with a time outside the window + assert tc.pruneCache() == 1 \ No newline at end of file