From b54dcf3329b8b4287d718739bec3f9f9fe6620f3 Mon Sep 17 00:00:00 2001 From: arilieb Date: Mon, 14 Jul 2025 14:33:01 -0700 Subject: [PATCH 1/6] work on kraming. --- src/keri/app/habbing.py | 524 ------------------------------------- src/keri/core/kraming.py | 194 ++++++++++++++ src/keri/db/basing.py | 5 + tests/core/test_kraming.py | 84 ++++++ 4 files changed, 283 insertions(+), 524 deletions(-) create mode 100644 src/keri/core/kraming.py create mode 100644 tests/core/test_kraming.py diff --git a/src/keri/app/habbing.py b/src/keri/app/habbing.py index f6aa72751..91c606f2a 100644 --- a/src/keri/app/habbing.py +++ b/src/keri/app/habbing.py @@ -912,68 +912,10 @@ def exit(self): class BaseHab: - """ - Hab class provides a given idetnifier controller's local resource environment - i.e. hab or habitat. Includes dependency injection of database, keystore, - configuration file as well as Kevery and key store Manager.. - - Attributes: (Injected) - ks (keeping.Keeper): lmdb key store - db (basing.Baser): lmdb data base for KEL etc - cf (configing.Configer): config file instance - mgr (keeping.Manager): creates and rotates keys in key store - rtr (routing.Router): routes reply 'rpy' messages - rvy (routing.Revery): factory that processes reply 'rpy' messages - kvy (eventing.Kevery): factory for local processing of local event msgs - psr (parsing.Parser): parses local messages for .kvy .rvy - - - Attributes: - name (str): alias of controller - pre (str): qb64 prefix of own local controller or None if new - temp (bool): True means testing: - use weak level when salty algo for stretching in key creation - for incept and rotate of keys for this hab.pre - inited (bool): True means fully initialized wrt databases. - False means not yet fully initialized - delpre (str | None): delegator prefix if any else None - - Properties: - kever (Kever): instance of key state of local controller - kevers (dict): of eventing.Kever instances from KELs in local db - keyed by qb64 prefix. Read through cache of of kevers of states for - KELs in db.states - iserder (serdering.SerderKERI): own inception event - prefixes (OrderedSet): local prefixes for .db - accepted (bool): True means accepted into local KEL. - False otherwise - - """ def __init__(self, ks, db, cf, mgr, rtr, rvy, kvy, psr, *, name='test', ns=None, pre=None, temp=False): - """ - Initialize instance. - - Injected Parameters: (injected dependencies) - ks (keeping.Keeper): lmdb key store - db (basing.Baser): lmdb data base for KEL etc - cf (configing.Configer): config file instance - mgr (keeping.Manager): creates and rotates keys in key store - rtr (routing.Router): routes reply 'rpy' messages - rvy (routing.Revery): factory that processes reply 'rpy' messages - kvy (eventing.Kevery): factory for local processing of local event msgs - psr (parsing.Parser): parses local messages for .kvy .rvy - - - Parameters: - name (str): alias name for local controller of habitat - pre (str | None): qb64 identifier prefix of own local controller else None - temp (bool): True means testing: - use weak level when salty algo for stretching in key creation - for incept and rotate of keys for this hab.pre - """ self.db = db # injected self.ks = ks # injected self.cf = cf # injected @@ -992,34 +934,7 @@ def __init__(self, ks, db, cf, mgr, rtr, rvy, kvy, psr, *, self.delpre = None # assigned laster if delegated def make(self, DnD, code, data, delpre, estOnly, isith, verfers, nsith, digers, toad, wits): - """ - Creates Serder of inception event for provided parameters. - Assumes injected dependencies were already setup. - Parameters: - isith (int | str | list | None): incepting signing threshold as - int, str hex, or list weighted if any, otherwise compute - default from verfers - code (str): prefix derivation code default Blake3 - nsith (int, str, list | None ): next signing threshold as int, - str hex or list weighted if any, otherwise compute default from - digers - verfers (list[Verfer]): Verfer instances for initial signing keys - digers (list[Diger] | None) Diger instances for next key digests - toad (int |str| None): int or str hex of witness threshold if - specified else compute default based on number of wits (backers) - wits (list | None): qb64 prefixes of witnesses if any - delpre (str | None): qb64 of delegator identifier prefix if any - estOnly (bool | None): True means add trait eventing.TraitDex.EstOnly - which means only establishment events allowed in KEL for this Hab - False (default) means allows non-est events and no trait is added. - DnD (bool): True means add trait of eventing.TraitDex.DnD which - means do not allow delegated identifiers from this identifier - False (default) means do allow and no trait is added. - - data (list | None): seal dicts - - """ icount = len(verfers) ncount = len(digers) if digers is not None else 0 if isith is None: # compute default @@ -1068,45 +983,6 @@ def save(self, habord): val=self.pre) def reconfigure(self): - """Apply configuration from config file managed by .cf. to this Hab. - Assumes that .pre and signing keys have been setup in order to create - own endpoint auth when provided in .cf. - - config file json or hjon - - { - "dt": "2021-01-01T00:00:00.000000+00:00", - "nel": - { - "dt": "2021-01-01T00:00:00.000000+00:00", - "curls": - [ - "tcp://localhost:5621/" - ] - }, - "iurls": - [ - "tcp://localhost:5620/?role=peer&name=tam" - ], - "durls": - [ - "http://127.0.0.1:7723/oobi/EBNaNu-M9P5cgrnfl2Fvymy4E_jvxxyjb70PRtiANlJy", - "http://127.0.0.1:7723/oobi/EMhvwOlyEJ9kN4PrwCpr9Jsv7TxPhiYveZ0oP3lJzdEi", - ], - "wurls": - [ - "http://127.0.0.1:5644/.well-known/keri/oobi/EBNaNu-M9P5cgrnfl2Fvymy4E_jvxxyjb70PRtiANlJy?name=Root" - ] - } - - - - Config file is meant to be read only at init not changed by app at - run time. Any dynamic app changes must go in database not config file - that way we don't have to worry about multiple writers of cf. - Use config file to preload database not as a database. Config file may - have named sections for Habery or individual Habs as needed. - """ conf = self.cf.get() if self.name not in conf: @@ -1132,9 +1008,6 @@ def reconfigure(self): @property def iserder(self): - """ - Return serder of inception event - """ if (dig := self.db.getKeLast(eventing.snKey(pre=self.pre, sn=0))) is None: raise kering.ConfigurationError("Missing inception event in KEL for " "Habitat pre={}.".format(self.pre)) @@ -1145,9 +1018,6 @@ def iserder(self): @property def kevers(self): - """ - Returns .db.kevers - """ return self.db.kevers @property @@ -1156,44 +1026,17 @@ def accepted(self): @property def kever(self): - """ - Returns kever for its .pre - """ return self.kevers[self.pre] if self.accepted else None @property def prefixes(self): - """ - Returns .db.prefixes - """ return self.db.prefixes def incept(self, **kwa): - """Alias for .make """ self.make(**kwa) def rotate(self, *, verfers=None, digers=None, isith=None, nsith=None, toad=None, cuts=None, adds=None, data=None): - """ - Perform rotation operation. Register rotation in database. - Returns: bytearrayrotation message with attached signatures. - - Parameters: - verfers (list | None): Verfer instances of public keys qb64 - digers (list | None): Diger instances of public next key digests qb64 - isith (int |str | None): current signing threshold as int or str hex - or list of str weights - default is prior next sith - nsith (int |str | None): next, next signing threshold as int - or str hex or list of str weights - default is based on isith when None - toad (int | str | None): hex of witness threshold after cuts and adds - cuts (list | None): of qb64 pre of witnesses to be removed from witness list - adds (list | None): of qb64 pre of witnesses to be added to witness list - data (list | None): of dicts of committed data such as seals - - - """ # recall that kever.pre == self.pre kever = self.kever # before rotation kever is prior next @@ -1265,10 +1108,6 @@ def rotate(self, *, verfers=None, digers=None, isith=None, nsith=None, toad=None return msg def interact(self, *, data=None): - """ - Perform interaction operation. Register interaction in database. - Returns: bytearray interaction message with attached signatures. - """ kever = self.kever serder = eventing.interact(pre=kever.prefixer.qb64, dig=kever.serder.said, @@ -1291,26 +1130,6 @@ def interact(self, *, data=None): def sign(self, ser, verfers=None, indexed=True, indices=None, ondices=None, **kwa): - """Sign given serialization ser using appropriate keys. - Use provided verfers or .kever.verfers to lookup keys to sign. - - Parameters: - ser (bytes): serialization to sign - verfers (list[Verfer] | None): Verfer instances to get pub verifier - keys to lookup private siging keys. - verfers None means use .kever.verfers. Assumes that when group - and verfers is not None then provided verfers must be .kever.verfers - indexed (bool): When not mhab then - True means use use indexed signatures and return - list of Siger instances. - False means do not use indexed signatures and return - list of Cigar instances - indices (list[int] | None): indices (offsets) - when indexed == True. See Manager.sign - ondices (list[int | None] | None): other indices (offsets) - when indexed is True. See Manager.sign - - """ if verfers is None: verfers = self.kever.verfers # when group these provide group signing keys @@ -1322,18 +1141,6 @@ def sign(self, ser, verfers=None, indexed=True, indices=None, ondices=None, **kw def decrypt(self, ser, verfers=None, **kwa): - """Decrypt given serialization ser using appropriate keys. - Use provided verfers or .kever.verfers to lookup keys to sign. - - Parameters: - ser (str | bytes | bytearray | memoryview): serialization to decrypt - - verfers (list[Verfer] | None): Verfer instances to get pub verifier - keys to lookup and convert to private decryption keys. - verfers None means use .kever.verfers. Assumes that when group - and verfers is not None then provided verfers must be .kever.verfers - - """ if verfers is None: verfers = self.kever.verfers # when group these provide group signing keys @@ -1343,18 +1150,6 @@ def decrypt(self, ser, verfers=None, **kwa): def query(self, pre, src, query=None, **kwa): - """ Create, sign and return a `qry` message against the attester for the prefix - - Parameters: - pre (str): qb64 identifier prefix being queried for - src (str): qb64 identifier prefix of attester being queried - query (dict): addition query modifiers to include in `q` - **kwa (dict): keyword arguments passed to eventing.query - - Returns: - bytearray: signed query event - - """ query = query if query is not None else dict() query['i'] = pre @@ -1363,19 +1158,7 @@ def query(self, pre, src, query=None, **kwa): return self.endorse(serder, last=True) def endorse(self, serder, last=False, pipelined=True): - """ - Returns msg with own endorsement of msg from serder with attached signature - groups based on own pre transferable or non-transferable. - Parameters: - serder (Serder): instance of msg - last (bool): True means use SealLast. False means use SealEvent - query messages use SealLast - pipelined (bool): True means use pipelining attachment code - - Useful for endorsing message when provided via serder such as state, - reply, query or similar. - """ if self.kever.prefixer.transferable: # create SealEvent or SealLast for endorser's est evt whose keys are # used to sign @@ -1414,12 +1197,6 @@ def exchange(self, route, modifiers=None, embeds=None, save=False): - """ - Returns signed exn, message of serder with count code and receipt - couples (pre+cig) - Builds msg and then processes it into own db to validate - """ - # sign serder event serder, end = exchanging.exchange(route=route, payload=payload, @@ -1445,11 +1222,6 @@ def exchange(self, route, return msg def receipt(self, serder): - """ - Returns own receipt, rct, message of serder with count code and receipt - couples (pre+cig) - Builds msg and then processes it into own db to validate - """ ked = serder.ked reserder = eventing.receipt(pre=ked["i"], sn=int(ked["s"], 16), @@ -1473,17 +1245,6 @@ def receipt(self, serder): def witness(self, serder): - """ - Returns own receipt, rct, message of serder with count code and witness - indexed receipt signatures if key state of serder.pre shows that own pre - is a current witness of event in serder - - ToDo XXXX add parameter to force check that serder as been accepted - as valid. Otherwise must assume that before calling this that serder - being witnessed has been accepted as valid event into this hab - controller's KEL - - """ if self.kever.prefixer.transferable: # not non-transferable prefix raise ValueError("Attempt to create witness receipt with" " transferable pre={}.".format(self.pre)) @@ -1515,16 +1276,6 @@ def witness(self, serder): def replay(self, pre=None, fn=0): - """ - Returns replay of FEL first seen event log for pre starting from fn - Default pre is own .pre - - Parameters: - pre is qb64 str or bytes of identifier prefix. - default is own .pre - fn is int first seen ordering number - - """ if not pre: pre = self.pre @@ -1539,27 +1290,12 @@ def replay(self, pre=None, fn=0): return msgs def replayAll(self): - """ - Returns replay of FEL first seen event log for all pre starting at key - - Parameters: - key (bytes): fnKey(pre, fn) - - """ msgs = bytearray() for msg in self.db.cloneAllPreIter(): msgs.extend(msg) return msgs def makeOtherEvent(self, pre, sn): - """ - Returns: messagized bytearray message with attached signatures of - own event at sequence number sn from retrieving event at sn - and associated signatures from database. - - Parameters: - sn is int sequence number of event - """ if pre not in self.kevers: return None @@ -1578,103 +1314,33 @@ def makeOtherEvent(self, pre, sn): return msg def fetchEnd(self, cid: str, role: str, eid: str): - """ - Returns: - endpoint (basing.EndpointRecord): instance or None - """ return self.db.ends.get(keys=(cid, role, eid)) def fetchLoc(self, eid: str, scheme: str = kering.Schemes.http): - """ - Returns: - location (basing.LocationRecord): instance or None - """ return self.db.locs.get(keys=(eid, scheme)) def fetchEndAllowed(self, cid: str, role: str, eid: str): - """ - Returns: - allowed (bool): True if eid is allowed as endpoint provider for cid - in role. False otherwise. - Parameters: - cid (str): identifier prefix qb64 of controller authZ endpoint provided - eid in role - role (str): endpoint role such as (controller, witness, watcher, etc) - eid (str): identifier prefix qb64 of endpoint provider in role - """ end = self.db.ends.get(keys=(cid, role, eid)) return end.allowed if end else None def fetchEndEnabled(self, cid: str, role: str, eid: str): - """ - Returns: - allowed (bool): True if eid is allowed as endpoint provider for cid - in role. False otherwise. - Parameters: - cid (str): identifier prefix qb64 of controller authZ endpoint provided - eid in role - role (str): endpoint role such as (controller, witness, watcher, etc) - eid (str): identifier prefix qb64 of endpoint provider in role - """ end = self.db.ends.get(keys=(cid, role, eid)) return end.enabled if end else None def fetchEndAuthzed(self, cid: str, role: str, eid: str): - """ - Returns: - allowed (bool): True if eid is allowed as endpoint provider for cid - in role. False otherwise. - Parameters: - cid (str): identifier prefix qb64 of controller authZ endpoint provided - eid in role - role (str): endpoint role such as (controller, witness, watcher, etc) - eid (str): identifier prefix qb64 of endpoint provider in role - """ end = self.db.ends.get(keys=(cid, role, eid)) return (end.enabled or end.allowed) if end else None def fetchUrl(self, eid: str, scheme: str = kering.Schemes.http): - """ - Returns: - url (str): for endpoint provider given by eid - empty string when url is nullified - None when no location record - """ loc = self.db.locs.get(keys=(eid, scheme)) return loc.url if loc else loc def fetchUrls(self, eid: str, scheme: str = ""): - """ - Returns: - surls (hicting.Mict): urls keyed by scheme for given eid. Assumes that - user independently verifies that the eid is allowed for a - given cid and role. If url is empty then does not return - - Parameters: - eid (str): identifier prefix qb64 of endpoint provider - scheme (str): url scheme - """ return hicting.Mict([(keys[1], loc.url) for keys, loc in self.db.locs.getItemIter(keys=(eid, scheme)) if loc.url]) def fetchRoleUrls(self, cid: str, *, role: str = "", scheme: str = "", eids=None, enabled: bool = True, allowed: bool = True): - """ - Returns: - rurls (hicting.Mict): of nested dicts. The top level dict rurls is keyed by - role for a given cid. Each value in rurls is eurls dict - keyed by the eid of authorized endpoint provider and - each value in eurls is a surls dict keyed by scheme - - Parameters: - cid (str): identifier prefix qb64 of controller authZ endpoint provided - eid in role - role (str): endpoint role such as (controller, witness, watcher, etc) - scheme (str): url scheme - eids (list): when provided restrict returns to only eids in eids - enabled (bool): True means fetch any allowed witnesses as well - allowed (bool): True means fetech any enabled witnesses as well - """ if eids is None: eids = [] @@ -1700,24 +1366,6 @@ def fetchRoleUrls(self, cid: str, *, role: str = "", scheme: str = "", def fetchWitnessUrls(self, cid: str, scheme: str = "", eids=None, enabled: bool = True, allowed: bool = True): - """ - Fetch witness urls for witnesses of cid at latest key state or enabled or - allowed witnesses if not a witness at latest key state. - - Returns: - rurls (hicting.Mict): of nested dicts. The top level dict rurls is keyed by - role for a given cid. Each value in rurls is eurls dict - dict keyed by the eid of authorized endpoint provider and - each value in eurls is a surls dict keyed by scheme - - Parameters: - cid (str): identifier prefix qb64 of controller authZ endpoint provided - eid is witness - scheme (str): url scheme - eids (list): when provided restrict returns to only eids in eids - enabled (bool): True means fetch any allowed witnesses as well - allowed (bool): True means fetech any enabled witnesses as well - """ return (self.fetchRoleUrls(cid=cid, role=kering.Roles.witness, scheme=scheme, @@ -1726,15 +1374,6 @@ def fetchWitnessUrls(self, cid: str, scheme: str = "", eids=None, allowed=allowed)) def endsFor(self, pre): - """ Load Authroized endpoints for provided AID - - Args: - pre (str): qb64 aid for which to load ends - - Returns: - dict: nest dict of Roles -> eid -> Schemes -> endpoints - - """ ends = dict() for (_, erole, eid), end in self.db.ends.getItemIter(keys=(pre,)): @@ -1764,34 +1403,9 @@ def endsFor(self, pre): return ends def reply(self, **kwa): - """ - Returns: - msg (bytearray): reply message - - Parameters: - route is route path string that indicates data flow handler (behavior) - to processs the reply - data is list of dicts of comitted data such as seals - dts is date-time-stamp of message at time or creation - version is Version instance - kind is serialization kind - """ return self.endorse(eventing.reply(**kwa)) def makeEndRole(self, eid, role=kering.Roles.controller, allow=True, stamp=None): - """ - Returns: - msg (bytearray): reply message allowing/disallowing endpoint provider - eid in role - - Parameters: - eid (str): qb64 of endpoint provider to be authorized - role (str): authorized role for eid - allow (bool): True means add eid at role as authorized - False means cut eid at role as unauthorized - stamp (str): date-time-stamp RFC-3339 profile of iso8601 datetime. - None means use now. - """ data = dict(cid=self.pre, role=role, eid=eid) route = "/end/role/add" if allow else "/end/role/cut" return self.reply(route=route, data=data, stamp=stamp) @@ -1828,43 +1442,11 @@ def loadEndRole(self, cid, eid, role=kering.Roles.controller): return msgs def makeLocScheme(self, url, eid=None, scheme="http", stamp=None): - """ - Returns: - msg (bytearray): reply message of own url service endpoint at scheme - - Parameters: - url (str): url of endpoint, may have scheme missing or not - If url is empty then nullifies location - eid (str): qb64 of endpoint provider to be authorized - scheme (str): url scheme must matche scheme in url if any - stamp (str): date-time-stamp RFC-3339 profile of iso8601 datetime. - None means use now. - - """ eid = eid if eid is not None else self.pre data = dict(eid=eid, scheme=scheme, url=url) return self.reply(route="/loc/scheme", data=data, stamp=stamp) def replyLocScheme(self, eid, scheme=""): - """ - Returns a reply message stream composed of entries authed by the given - eid from the appropriate reply database including associated attachments - in order to disseminate (percolate) BADA reply data authentication proofs. - - Currently uses promiscuous model for permitting endpoint discovery. - Future is to use identity constraint graph to constrain discovery - of whom by whom. - - eid and not scheme then: - loc url for all schemes at eid - - eid and scheme then: - loc url for scheme at eid - - Parameters: - eid (str): endpoint provider id - scheme (str): url scheme - """ msgs = bytearray() urls = self.fetchUrls(eid=eid, scheme=scheme) @@ -1904,40 +1486,6 @@ def loadLocScheme(self, eid, scheme=None): return msgs def replyEndRole(self, cid, role=None, eids=None, scheme=""): - - """ - Returns a reply message stream composed of entries authed by the given - cid from the appropriate reply database including associated attachments - in order to disseminate (percolate) BADA reply data authentication proofs. - - Currently uses promiscuous model for permitting endpoint discovery. - Future is to use identity constraint graph to constrain discovery - of whom by whom. - - cid and not role and not scheme then: - end authz for all eids in all roles and loc url for all schemes at each eid - if eids then only eids in eids else all eids - - cid and not role and scheme then: - end authz for all eid in all roles and loc url for scheme at each eid - if eids then only eids in eids else all eids - - cid and role and not scheme then: - end authz for all eid in role and loc url for all schemes at each eid - if eids then only eids in eids else all eids - - cid and role and scheme then: - end authz for all eid in role and loc url for scheme at each eid - if eids then only eids in eids else all eids - - - Parameters: - cid (str): identifier prefix qb64 of controller authZ endpoint provided - eid is witness - role (str): authorized role for eid - eids (list): when provided restrict returns to only eids in eids - scheme (str): url scheme - """ msgs = bytearray() if eids is None: @@ -1970,39 +1518,9 @@ def replyEndRole(self, cid, role=None, eids=None, scheme=""): return msgs def replyToOobi(self, aid, role, eids=None): - """ - Returns a reply message stream composed of entries authed by the given - aid from the appropriate reply database including associated attachments - in order to disseminate (percolate) BADA reply data authentication proofs. - - Currently uses promiscuous model for permitting oobi initiated endpoint - discovery. Future is to use identity constraint graph to constrain - discovery of whom by whom. - - This method is entry point for initiating replies generated by - .replyEndRole and/or .replyLocScheme - - Parameters: - aid (str): qb64 of identifier in oobi, may be cid or eid - role (str): authorized role for eid - eids (list): when provided restrict returns to only eids in eids - - """ - # default logic is that if self.pre is witness of aid and has a loc url - # for self then reply with loc scheme for all witnesses even if self - # not permiteed in .habs.oobis return self.replyEndRole(cid=aid, role=role, eids=eids) def getOwnEvent(self, sn, allowPartiallySigned=False): - """ - Returns: message Serder and controller signatures of - own event at sequence number sn from retrieving event at sn - and associated signatures from database. - - Parameters: - sn (int): is int sequence number of event - allowPartiallySigned(bool): True means attempt to load from partial signed escrow - """ key = dbing.snKey(self.pre, sn) dig = self.db.getKeLast(key) if dig is None and allowPartiallySigned: @@ -2025,16 +1543,6 @@ def getOwnEvent(self, sn, allowPartiallySigned=False): return serder, sigs, couple def makeOwnEvent(self, sn, allowPartiallySigned=False): - """ - Returns: messagized bytearray message with attached signatures of - own event at sequence number sn from retrieving event at sn - and associated signatures from database. - - Parameters: - sn(int): is int sequence number of event - allowPartiallySigned(bool): True means attempt to load from partial signed escrow - - """ msg = bytearray() serder, sigs, couple = self.getOwnEvent(sn=sn, allowPartiallySigned=allowPartiallySigned) @@ -2052,33 +1560,15 @@ def makeOwnEvent(self, sn, allowPartiallySigned=False): return msg def makeOwnInception(self, allowPartiallySigned=False): - """ - Returns: messagized bytearray message with attached signatures of - own inception event by retrieving event and signatures - from database. - """ return self.makeOwnEvent(sn=0, allowPartiallySigned=allowPartiallySigned) def processCues(self, cues): - """ - Returns bytearray of messages as a result of processing all cues - - Parameters: - cues is deque of cues - """ msgs = bytearray() # outgoing messages for msg in self.processCuesIter(cues): msgs.extend(msg) return msgs def processCuesIter(self, cues): - """ - Iterate through cues and yields one or more msgs for each cue. - - Parameters: - cues is deque of cues - - """ while cues: # iteratively process each cue in cues msgs = bytearray() cue = cues.pull() # cues.popleft() @@ -2121,20 +1611,6 @@ def processCuesIter(self, cues): msg = self.reply(data=data, route=route) yield msg - # ToDo XXXX cue for kin = "query" various types of queries - # (query witness, query delegation etc) - # ToDo XXXX cue for kin = "notice" new event - # ToDo XXXX cue for kin = "witness" to create witness receipt own is witness - # ToDo XXXX cue for kin = "noticeBadCloneFN" - # ToDo XXXX cue for kin = "approveDelegation" own is delegator - - # ToDo XXXX cue for kin = "keyStateSaved" - # ToDo XXXX cue for kin = "psUnescrow" - # ToDo XXXX cue for kin = "stream" - # ToDo XXXX cue for kin = "invalid" - # ToDo XXXX cue for kin=""remoteMemberedSig"" - - def witnesser(self): return True diff --git a/src/keri/core/kraming.py b/src/keri/core/kraming.py new file mode 100644 index 000000000..7d788b2ce --- /dev/null +++ b/src/keri/core/kraming.py @@ -0,0 +1,194 @@ +import time + +from keri.help import helping + +from typing import Tuple, Optional, NamedTuple + +from hio.base import Doer + +from keri.core.serdering import Serder +from keri.kering import Ilks + + +class MessageType(NamedTuple): + """Named tuple for KERI message types, not yet used.""" + QRY: str = "qry" + RPY: str = "rpy" + PRO: str = "pro" + BAR: str = "bar" + EXN: str = "exn" + + +MESSAGE_TYPES = MessageType() + +class TimelinessCache: + """TimelinessCache is responsible for preventing replay attacks in KERI/KRAM by ensuring + messages are timely and processed in a strictly monotonically ordered fashion. + + It maintains: + 1. A Lagging Window Size Table - to determine validity windows for different message types + 2. A Replay Cache Table - to store timestamps of previously validated messages + """ + + def __init__(self, db, defaultWindowSize=300_000_000, defaultDriftSkew=60_000_000): + """Initialize the TimelinessCache. + + Parameters: + db: Database instance that contains the IoSetSuber at db.time + defaultWindowSize (int): Default window size in microseconds + defaultDriftSkew (int): Default drift skew in microseconds + """ + self.defaultWindowSize = defaultWindowSize + self.defaultDriftSkew = defaultDriftSkew + + # Will eventually be used with MessageType named tuple to set the window size for a given message type + self._windowParamsTable = {} + + # Database access + self.db = db + + def getWindowParameters(self, aid, messageType=None): + """Get window parameters for given autonomic identifier and message type. + Eventually this method will reference a table of window parameters. + + Parameters: + aid (str): autonomic identifier + messageType (str | None): message type identifier. None for now, but will + be used to determine the window size for a given message type + + Returns: + tuple: (windowSize, driftSkew) as integers in microseconds + """ + return self.defaultWindowSize, self.defaultDriftSkew + + def _constructCacheKey(self, serder): + """Construct the key for the Replay Cache Table. + + Parameters: + serder: The SerderKERI instance containing the message + + Returns: + str: The key for the Replay Cache Table + """ + sad = serder.sad + sourceAid = sad.get("i", "") # 'i' for identifier in KERI messages + # messageType = serder.ilk # Use serder.ilk for message type + + # return (sourceAid, messageType) + return sourceAid + + def _getCachedTimestamp(self, key): + """Get the cached timestamp for a key from the Replay Cache Table. + + Parameters: + key (str): The cache key + + Returns: + int | None: The cached timestamp in microseconds or None if not found + """ + try: + storedTimestamp = self.db.time.getLast(key) + if storedTimestamp is not None: + return int(storedTimestamp) + except Exception as e: + print(e) + pass + return None + + def _storeTimestamp(self, key, timestamp): + """Store a timestamp in the Replay Cache Table. + + Parameters: + key (str): The cache key + timestamp (int): The timestamp to store in microseconds + """ + timestampStr = str(timestamp) + + # QUESTION: Do we need to cache the whole request + self.db.time.pin(key, [timestampStr]) + + def checkMessageTimeliness(self, serder): + """Check if a message is timely and not a replay. + + Parameters: + serder: The Serder instance containing the message + + Returns: + tuple: (isValid, reason) where: + isValid (bool): True if the message is timely and not a replay + reason (str): A description of why the message was accepted or rejected + """ + if not serder.verify(): + return False, "Invalid message structure" + + sad = serder.sad + + sourceAid = sad.get("i", None) + messageType = serder.ilk or None + timestamp = sad.get("dt", None) + + if not all([sourceAid, messageType, timestamp]): + return False, "Missing required message fields" + + windowSize, driftSkew = self.getWindowParameters(sourceAid, messageType) + + # Convert both timestamps to microseconds since epoch for comparison + currentTime = helping.fromIso8601(helping.nowIso8601()) + currentTimeMicros = int(currentTime.timestamp() * 1_000_000) + + messageTime = helping.fromIso8601(timestamp) + messageTimeMicros = int(messageTime.timestamp() * 1_000_000) + + if (messageTimeMicros < currentTimeMicros - driftSkew - windowSize or + messageTimeMicros > currentTimeMicros + driftSkew): + return False, "Message timestamp outside lagging window" + + cacheKey = self._constructCacheKey(serder) + + cachedTimestamp = self._getCachedTimestamp(cacheKey) + + if cachedTimestamp is None: + self._storeTimestamp(cacheKey, messageTimeMicros) + return True, "Message accepted, new entry" + + if messageTimeMicros > cachedTimestamp: + self._storeTimestamp(cacheKey, messageTimeMicros) + return True, "Message accepted, timestamp updated" + + if messageTimeMicros == cachedTimestamp: + # QUESTION: Should we be accepting messages here? + return False, "" + + return False, "Message dropped, older than cached timestamp (replay/out-of-order)" + + def pruneCache(self): + """Prune stale entries from the Replay Cache Table. + + Returns: + int: The number of pruned entries + """ + prunedCount = 0 + currentTime = int(helping.fromIso8601(helping.nowIso8601()).timestamp()) * 1_000_000 + + for key, timestampStr in self.db.time.getItemIter(): + try: + + timestamp = int(timestampStr) + + windowSize, driftSkew = self.getWindowParameters(key) + + if timestamp < currentTime - driftSkew - windowSize: + self.db.time.rem(key) + prunedCount += 1 + + except Exception: + continue + + return prunedCount + +# class TimelinessEscrowDoer(Doer): + +# if __name__ == "__main__": +# +# from keri.core.serdering import Serder +# pass \ No newline at end of file diff --git a/src/keri/db/basing.py b/src/keri/db/basing.py index 0188fd417..6695e68d7 100644 --- a/src/keri/db/basing.py +++ b/src/keri/db/basing.py @@ -1304,6 +1304,11 @@ def reopen(self, **kwa): # TODO: clean self.maids = subing.CesrIoSetSuber(db=self, subkey="maids.", klas=coring.Prefixer) + self.time = subing.IoSetSuber(db=self, + subkey='kram.', + schema=OobiRecord, + sep=">") + self.reload() return self.env diff --git a/tests/core/test_kraming.py b/tests/core/test_kraming.py new file mode 100644 index 000000000..fa9bd4476 --- /dev/null +++ b/tests/core/test_kraming.py @@ -0,0 +1,84 @@ +from keri.app import habbing +from keri.core import kraming, serdering +from keri.help import helping + + +def test_timeliness(monkeypatch): + + def mockNowIso8601(): + return "2021-06-27T21:26:21.233257+00:00" + + monkeypatch.setattr(helping, "nowIso8601", mockNowIso8601) + + with habbing.openHab(name="kramTest", base="test", salt=b'0123456789abcdeg') as (hby, hab): + + tc = kraming.TimelinessCache(db=hby.db, defaultDriftSkew=1_000_000, defaultWindowSize=1_000_000) + + assert tc.defaultWindowSize == 1_000_000 + assert tc.defaultDriftSkew == 1_000_000 + + def create_test_serder(ilk, timestamp=None, route=None, sourceAid=hab.pre, + message_id="EckOnHB11J4H9q16I3tN8DdpNXnCiP5QJQ7yvkWqTDdA", qBlock=None, routeParams=None): + """Helper to create a test serder with specified parameters""" + if timestamp is None: + timestamp = helping.nowIso8601() + + sad = { + "v": "KERI10JSON00011c_", + "t": ilk, + "i": sourceAid, + "dt": timestamp, + } + + if route: + if routeParams: + route += "?" + "&".join([f"{k}={v}" for k, v in routeParams.items()]) + sad["r"] = route + + if qBlock: + sad["q"] = qBlock + + return serdering.SerderKERI(sad=sad, makify=True) + + current_time = helping.nowIso8601() + + routeParams = {"transaction_type": "credential"} + + offerQBlock = { + "issuer": f"did:keri:{hab.pre}", + "output_descriptors": ["EckOnHB11J4H9q16I3tN8DdpNXnCiP5QJQ7yvkWqTDdA"], + "format": {"cesr": {"proof_type": ["Ed25519Signature2018"]}} + } + + offerSerder = create_test_serder( + ilk="exn", + timestamp=current_time, + route="/credential/offer", + routeParams=routeParams, + qBlock=offerQBlock + ) + + # Cache first entry + isValid, reason = tc.checkMessageTimeliness(offerSerder) + assert isValid, f"Credential offer message should be valid, got: {reason}" + assert reason == "Message accepted, new entry" + + def mockNowIso8601Later(): + return "2021-06-27T21:26:24.233257+00:00" + + monkeypatch.setattr(helping, "nowIso8601", mockNowIso8601Later) + + offerSerder2 = create_test_serder(ilk="exn", timestamp=mockNowIso8601Later(), route="/credential/offer", + sourceAid="BGKVzj4ve0VSd8z_AmvhLg4lqcC_9WYX90k03q-R_Ydo") + + # Cache new entry + isValid, reason = tc.checkMessageTimeliness(offerSerder2) + assert isValid, f"Credential offer message should be valid, got: {reason}" + assert reason == "Message accepted, new entry" + + # Attempt to cache first entry again + isValid, reason = tc.checkMessageTimeliness(offerSerder) + assert not isValid, f"Credential offer message should be invalid, got: {reason}" + + # Prune only the entry with a time outside the window + assert tc.pruneCache() == 1 \ No newline at end of file From 99c2b85d173964909a184a17177b2da1294177e9 Mon Sep 17 00:00:00 2001 From: arilieb Date: Tue, 29 Jul 2025 07:38:04 -0700 Subject: [PATCH 2/6] Checkpoint for kraming. --- src/keri/core/kraming.py | 85 ++++++++++++++++++++++++++++++---------- 1 file changed, 64 insertions(+), 21 deletions(-) diff --git a/src/keri/core/kraming.py b/src/keri/core/kraming.py index 7d788b2ce..08946a176 100644 --- a/src/keri/core/kraming.py +++ b/src/keri/core/kraming.py @@ -1,14 +1,16 @@ -import time +from keri import kering from keri.help import helping -from typing import Tuple, Optional, NamedTuple - +from typing import NamedTuple +from keri import help from hio.base import Doer - +import json +from hio.help import decking from keri.core.serdering import Serder from keri.kering import Ilks +logger = help.ogler.getLogger() class MessageType(NamedTuple): """Named tuple for KERI message types, not yet used.""" @@ -29,8 +31,8 @@ class TimelinessCache: 1. A Lagging Window Size Table - to determine validity windows for different message types 2. A Replay Cache Table - to store timestamps of previously validated messages """ - - def __init__(self, db, defaultWindowSize=300_000_000, defaultDriftSkew=60_000_000): + #change to float time, 3 seconds, 1 second + def __init__(self, db, defaultWindowSize=300_000_000, defaultDriftSkew=60_000_000, cues=None, msgs=None): """Initialize the TimelinessCache. Parameters: @@ -41,15 +43,24 @@ def __init__(self, db, defaultWindowSize=300_000_000, defaultDriftSkew=60_000_00 self.defaultWindowSize = defaultWindowSize self.defaultDriftSkew = defaultDriftSkew - # Will eventually be used with MessageType named tuple to set the window size for a given message type - self._windowParamsTable = {} - - # Database access self.db = db + def setWindowParameters(self, aid, windowSize=None, driftSkew=None, messageType=None): + """Set window parameters for given autonomic identifier and message type. + """ + windowSize = windowSize or self.defaultWindowSize + driftSkew = driftSkew or self.defaultDriftSkew + + # Serialize the tuple as JSON bytes for storage + windowTuple = (windowSize, driftSkew) + serialized = json.dumps(windowTuple).encode('utf-8') + + self.db.kram.pin(aid, [serialized]) + + def getWindowParameters(self, aid, messageType=None): """Get window parameters for given autonomic identifier and message type. - Eventually this method will reference a table of window parameters. + Falls back to default values if no entry exists for the aid. Parameters: aid (str): autonomic identifier @@ -59,8 +70,20 @@ def getWindowParameters(self, aid, messageType=None): Returns: tuple: (windowSize, driftSkew) as integers in microseconds """ + try: + # Try to get the stored parameters + storedData = self.db.kram.getLast(aid) + if storedData is not None: + # Deserialize from JSON bytes + windowTuple = json.loads(storedData.decode('utf-8')) + return windowTuple[0], windowTuple[1] + except Exception: + pass + + # Fallback to defaults return self.defaultWindowSize, self.defaultDriftSkew + def _constructCacheKey(self, serder): """Construct the key for the Replay Cache Table. @@ -104,7 +127,6 @@ def _storeTimestamp(self, key, timestamp): """ timestampStr = str(timestamp) - # QUESTION: Do we need to cache the whole request self.db.time.pin(key, [timestampStr]) def checkMessageTimeliness(self, serder): @@ -141,7 +163,7 @@ def checkMessageTimeliness(self, serder): if (messageTimeMicros < currentTimeMicros - driftSkew - windowSize or messageTimeMicros > currentTimeMicros + driftSkew): - return False, "Message timestamp outside lagging window" + raise kering.ValidationError(f"Message is out of time window {serder.pretty()}") cacheKey = self._constructCacheKey(serder) @@ -149,17 +171,19 @@ def checkMessageTimeliness(self, serder): if cachedTimestamp is None: self._storeTimestamp(cacheKey, messageTimeMicros) - return True, "Message accepted, new entry" + # Message accepted, new entry + return True if messageTimeMicros > cachedTimestamp: self._storeTimestamp(cacheKey, messageTimeMicros) - return True, "Message accepted, timestamp updated" + # Message accepted, updated cached entry + return True if messageTimeMicros == cachedTimestamp: # QUESTION: Should we be accepting messages here? return False, "" - return False, "Message dropped, older than cached timestamp (replay/out-of-order)" + raise kering.ValidationError(f"Message is out of order {serder.pretty()}") def pruneCache(self): """Prune stale entries from the Replay Cache Table. @@ -186,9 +210,28 @@ def pruneCache(self): return prunedCount -# class TimelinessEscrowDoer(Doer): + def processKrms(self): + for said, serder in self.db.krms.getFullItemIter(): + try: + # assumes serder is a SerderKERI, more processing may be needed or addition of klas to db.krms + if self.checkMessageTimeliness(serder): + self.db.exns.pin(said, serder) + self.db.krms.rem(said) + except kering.ValidationError as e: + logger.error(f"Invalid message: {e}") + self.pruneCache() + +class KramDoer(Doer): + """KramDoer is a Doer that manages the KRAM database.""" + def __init__(self, db): + self.db = db + + self.tc = TimelinessCache(self.db) + + super(KramDoer, self).__init__() + + def recur(self, tyme): + self.tc.processKrms() + + -# if __name__ == "__main__": -# -# from keri.core.serdering import Serder -# pass \ No newline at end of file From d6fdd538e8526863bdb6b246e70b4d1f364c012d Mon Sep 17 00:00:00 2001 From: arilieb Date: Tue, 29 Jul 2025 13:42:32 -0700 Subject: [PATCH 3/6] Kram module, kram tests, and point solution kram implementation for query messages(non-trans, does not need escrow). --- src/keri/core/kraming.py | 66 +++++++++++++++++--------------------- src/keri/core/parsing.py | 10 +++++- src/keri/db/basing.py | 13 ++++++-- tests/core/test_kraming.py | 44 +++++++++++-------------- 4 files changed, 68 insertions(+), 65 deletions(-) diff --git a/src/keri/core/kraming.py b/src/keri/core/kraming.py index 08946a176..5d238837e 100644 --- a/src/keri/core/kraming.py +++ b/src/keri/core/kraming.py @@ -1,4 +1,3 @@ - from keri import kering from keri.help import helping @@ -6,12 +5,10 @@ from keri import help from hio.base import Doer import json -from hio.help import decking -from keri.core.serdering import Serder -from keri.kering import Ilks logger = help.ogler.getLogger() +# TODO: Implement message type as part of TimelinessCache module for more granular control class MessageType(NamedTuple): """Named tuple for KERI message types, not yet used.""" QRY: str = "qry" @@ -31,14 +28,14 @@ class TimelinessCache: 1. A Lagging Window Size Table - to determine validity windows for different message types 2. A Replay Cache Table - to store timestamps of previously validated messages """ - #change to float time, 3 seconds, 1 second - def __init__(self, db, defaultWindowSize=300_000_000, defaultDriftSkew=60_000_000, cues=None, msgs=None): + + def __init__(self, db, defaultWindowSize=3.0, defaultDriftSkew=1.0): """Initialize the TimelinessCache. Parameters: db: Database instance that contains the IoSetSuber at db.time - defaultWindowSize (int): Default window size in microseconds - defaultDriftSkew (int): Default drift skew in microseconds + defaultWindowSize (float): Default window size in seconds + defaultDriftSkew (float): Default drift skew in seconds """ self.defaultWindowSize = defaultWindowSize self.defaultDriftSkew = defaultDriftSkew @@ -48,6 +45,7 @@ def __init__(self, db, defaultWindowSize=300_000_000, defaultDriftSkew=60_000_00 def setWindowParameters(self, aid, windowSize=None, driftSkew=None, messageType=None): """Set window parameters for given autonomic identifier and message type. """ + # TODO: Implement message type as part of the window parameters windowSize = windowSize or self.defaultWindowSize driftSkew = driftSkew or self.defaultDriftSkew @@ -68,8 +66,9 @@ def getWindowParameters(self, aid, messageType=None): be used to determine the window size for a given message type Returns: - tuple: (windowSize, driftSkew) as integers in microseconds + tuple: (windowSize, driftSkew) as floats in seconds """ + # TODO: Implement message type as part of the window parameters try: # Try to get the stored parameters storedData = self.db.kram.getLast(aid) @@ -93,11 +92,10 @@ def _constructCacheKey(self, serder): Returns: str: The key for the Replay Cache Table """ + # TODO: Implement message type as part of the key (serder.ilk) sad = serder.sad - sourceAid = sad.get("i", "") # 'i' for identifier in KERI messages - # messageType = serder.ilk # Use serder.ilk for message type + sourceAid = sad.get("i", "") - # return (sourceAid, messageType) return sourceAid def _getCachedTimestamp(self, key): @@ -107,12 +105,12 @@ def _getCachedTimestamp(self, key): key (str): The cache key Returns: - int | None: The cached timestamp in microseconds or None if not found + float | None: The cached timestamp in seconds or None if not found """ try: storedTimestamp = self.db.time.getLast(key) if storedTimestamp is not None: - return int(storedTimestamp) + return float(storedTimestamp) except Exception as e: print(e) pass @@ -123,7 +121,7 @@ def _storeTimestamp(self, key, timestamp): Parameters: key (str): The cache key - timestamp (int): The timestamp to store in microseconds + timestamp (float): The timestamp to store in seconds """ timestampStr = str(timestamp) @@ -154,15 +152,13 @@ def checkMessageTimeliness(self, serder): windowSize, driftSkew = self.getWindowParameters(sourceAid, messageType) - # Convert both timestamps to microseconds since epoch for comparison - currentTime = helping.fromIso8601(helping.nowIso8601()) - currentTimeMicros = int(currentTime.timestamp() * 1_000_000) + # Convert both timestamps to seconds since epoch for comparison + currentTime = helping.fromIso8601(helping.nowIso8601()).timestamp() - messageTime = helping.fromIso8601(timestamp) - messageTimeMicros = int(messageTime.timestamp() * 1_000_000) + messageTime = helping.fromIso8601(timestamp).timestamp() - if (messageTimeMicros < currentTimeMicros - driftSkew - windowSize or - messageTimeMicros > currentTimeMicros + driftSkew): + if (messageTime < currentTime - driftSkew - windowSize or + messageTime > currentTime + driftSkew): raise kering.ValidationError(f"Message is out of time window {serder.pretty()}") cacheKey = self._constructCacheKey(serder) @@ -170,18 +166,17 @@ def checkMessageTimeliness(self, serder): cachedTimestamp = self._getCachedTimestamp(cacheKey) if cachedTimestamp is None: - self._storeTimestamp(cacheKey, messageTimeMicros) + self._storeTimestamp(cacheKey, messageTime) # Message accepted, new entry return True - if messageTimeMicros > cachedTimestamp: - self._storeTimestamp(cacheKey, messageTimeMicros) + if messageTime > cachedTimestamp: + self._storeTimestamp(cacheKey, messageTime) # Message accepted, updated cached entry return True - if messageTimeMicros == cachedTimestamp: - # QUESTION: Should we be accepting messages here? - return False, "" + if messageTime == cachedTimestamp: + raise kering.ValidationError(f"Message replay detected {serder.pretty()}") raise kering.ValidationError(f"Message is out of order {serder.pretty()}") @@ -192,12 +187,12 @@ def pruneCache(self): int: The number of pruned entries """ prunedCount = 0 - currentTime = int(helping.fromIso8601(helping.nowIso8601()).timestamp()) * 1_000_000 + currentTime = helping.fromIso8601(helping.nowIso8601()).timestamp() for key, timestampStr in self.db.time.getItemIter(): try: - timestamp = int(timestampStr) + timestamp = float(timestampStr) windowSize, driftSkew = self.getWindowParameters(key) @@ -213,10 +208,11 @@ def pruneCache(self): def processKrms(self): for said, serder in self.db.krms.getFullItemIter(): try: - # assumes serder is a SerderKERI, more processing may be needed or addition of klas to db.krms + # assumes serder is a SerderKERI, more processing may be needed if self.checkMessageTimeliness(serder): - self.db.exns.pin(said, serder) + # TODO: Implement escrowing functionality self.db.krms.rem(said) + logger.info(f"Message accepted: {serder.pretty()}") except kering.ValidationError as e: logger.error(f"Invalid message: {e}") self.pruneCache() @@ -231,7 +227,5 @@ def __init__(self, db): super(KramDoer, self).__init__() def recur(self, tyme): - self.tc.processKrms() - - - + # TODO: Implement KRAM escrowing functionality + self.tc.processKrms() \ No newline at end of file diff --git a/src/keri/core/parsing.py b/src/keri/core/parsing.py index 65d39c03e..0c71d8ade 100644 --- a/src/keri/core/parsing.py +++ b/src/keri/core/parsing.py @@ -11,6 +11,7 @@ from base64 import urlsafe_b64decode as decodeB64 from base64 import urlsafe_b64encode as encodeB64 +from .kraming import TimelinessCache from .. import kering from ..kering import (Colds, sniff, Vrsn_1_0, Vrsn_2_0, ShortageError, ColdStartError) @@ -132,7 +133,7 @@ class Parser: def __init__(self, ims=None, framed=True, piped=False, kvy=None, tvy=None, exc=None, rvy=None, vry=None, local=False, - version=Vrsn_2_0): + krm=None, version=Vrsn_2_0): """ Initialize instance: @@ -166,6 +167,12 @@ def __init__(self, ims=None, framed=True, piped=False, kvy=None, self.version = version # provided version may be earlier than supported version # version sets .methods, .codes, .sucodes, and .mucodes + # default timeliness cache / KRAM behavior + self.krm = krm + if not krm and self.kvy and self.kvy.db: + self.krm = TimelinessCache(self.kvy.db) + + @property def genus(self): @@ -1196,6 +1203,7 @@ def msgParsator(self, ims=None, framed=True, piped=False, elif ilk in (Ilks.qry,): # query message # ToDo neigher kvy.processQuery nor tvy.processQuery actually verify + self.krm.checkMessageTimeliness(serder) if exts['ssgs']: # use last one if more than one pre, sigers = exts['ssgs'][-1] if exts['ssgs'] else (None, None) diff --git a/src/keri/db/basing.py b/src/keri/db/basing.py index 6695e68d7..1f92111ac 100644 --- a/src/keri/db/basing.py +++ b/src/keri/db/basing.py @@ -1304,11 +1304,20 @@ def reopen(self, **kwa): # TODO: clean self.maids = subing.CesrIoSetSuber(db=self, subkey="maids.", klas=coring.Prefixer) + # timeliness cache of identifiers (soon to also include message types) self.time = subing.IoSetSuber(db=self, - subkey='kram.', - schema=OobiRecord, + subkey='time.', sep=">") + # Window sizes and drift skews for specific identifiers (soon to also include message types) + self.kram = subing.IoSetSuber(db=self, + subkey='kram.', + sep=">", + ) + + # Timeliness cache of identifiers (soon to also include message types) + self.krms = subing.IoSetSuber(db=self, subkey='krms.', sep=">") + self.reload() return self.env diff --git a/tests/core/test_kraming.py b/tests/core/test_kraming.py index fa9bd4476..ddc039908 100644 --- a/tests/core/test_kraming.py +++ b/tests/core/test_kraming.py @@ -1,6 +1,8 @@ +import pytest from keri.app import habbing from keri.core import kraming, serdering from keri.help import helping +from keri import kering def test_timeliness(monkeypatch): @@ -12,13 +14,15 @@ def mockNowIso8601(): with habbing.openHab(name="kramTest", base="test", salt=b'0123456789abcdeg') as (hby, hab): - tc = kraming.TimelinessCache(db=hby.db, defaultDriftSkew=1_000_000, defaultWindowSize=1_000_000) + tc = kraming.TimelinessCache(db=hby.db, defaultDriftSkew=1.0, defaultWindowSize=1.0) - assert tc.defaultWindowSize == 1_000_000 - assert tc.defaultDriftSkew == 1_000_000 + assert tc.defaultWindowSize == 1.0 + assert tc.defaultDriftSkew == 1.0 - def create_test_serder(ilk, timestamp=None, route=None, sourceAid=hab.pre, - message_id="EckOnHB11J4H9q16I3tN8DdpNXnCiP5QJQ7yvkWqTDdA", qBlock=None, routeParams=None): + # Set window parameters for the test AIDs + tc.setWindowParameters(hab.pre, windowSize=1.0, driftSkew=1.0) + + def create_test_serder(ilk, timestamp=None, route=None, sourceAid=hab.pre, qBlock=None, routeParams=None): """Helper to create a test serder with specified parameters""" if timestamp is None: timestamp = helping.nowIso8601() @@ -42,43 +46,31 @@ def create_test_serder(ilk, timestamp=None, route=None, sourceAid=hab.pre, current_time = helping.nowIso8601() - routeParams = {"transaction_type": "credential"} - - offerQBlock = { - "issuer": f"did:keri:{hab.pre}", - "output_descriptors": ["EckOnHB11J4H9q16I3tN8DdpNXnCiP5QJQ7yvkWqTDdA"], - "format": {"cesr": {"proof_type": ["Ed25519Signature2018"]}} - } - offerSerder = create_test_serder( ilk="exn", timestamp=current_time, route="/credential/offer", - routeParams=routeParams, - qBlock=offerQBlock ) # Cache first entry - isValid, reason = tc.checkMessageTimeliness(offerSerder) - assert isValid, f"Credential offer message should be valid, got: {reason}" - assert reason == "Message accepted, new entry" + isValid = tc.checkMessageTimeliness(offerSerder) + assert isValid def mockNowIso8601Later(): - return "2021-06-27T21:26:24.233257+00:00" + return "2021-06-27T21:26:23.233258+00:00" monkeypatch.setattr(helping, "nowIso8601", mockNowIso8601Later) offerSerder2 = create_test_serder(ilk="exn", timestamp=mockNowIso8601Later(), route="/credential/offer", - sourceAid="BGKVzj4ve0VSd8z_AmvhLg4lqcC_9WYX90k03q-R_Ydo") + sourceAid="BGKVzj4ve0VSd8z_AmvhLg4lqcC_9WYX90k03q-R_Ydo") # Cache new entry - isValid, reason = tc.checkMessageTimeliness(offerSerder2) - assert isValid, f"Credential offer message should be valid, got: {reason}" - assert reason == "Message accepted, new entry" + isValid = tc.checkMessageTimeliness(offerSerder2) + assert isValid - # Attempt to cache first entry again - isValid, reason = tc.checkMessageTimeliness(offerSerder) - assert not isValid, f"Credential offer message should be invalid, got: {reason}" + # Attempt to cache first entry again - this should now raise ValidationError + with pytest.raises(kering.ValidationError): + tc.checkMessageTimeliness(offerSerder) # Prune only the entry with a time outside the window assert tc.pruneCache() == 1 \ No newline at end of file From 86f31aa6957f77a36284b2852fbbf9c7fd1b9a89 Mon Sep 17 00:00:00 2001 From: arilieb Date: Tue, 29 Jul 2025 13:51:08 -0700 Subject: [PATCH 4/6] Added comments back to habbing.py. --- src/keri/app/habbing.py | 528 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 526 insertions(+), 2 deletions(-) diff --git a/src/keri/app/habbing.py b/src/keri/app/habbing.py index 91c606f2a..6f5afee7f 100644 --- a/src/keri/app/habbing.py +++ b/src/keri/app/habbing.py @@ -293,8 +293,8 @@ def setup(self, *, seed=None, aeid=None, bran=None, pidx=None, algo=None, raise ValueError(f"Bran (passcode seed material) too short.") bran = coring.MtrDex.Salt_128 + 'A' + bran[:21] # qb64 salt for seed signer = core.Salter(qb64=bran).signer(transferable=False, - tier=tier, - temp=temp) + tier=tier, + temp=temp) seed = signer.qb64 if not aeid: # aeid must not be empty event on initial creation aeid = signer.verfer.qb64 # lest it remove encryption @@ -912,10 +912,68 @@ def exit(self): class BaseHab: + """ + Hab class provides a given idetnifier controller's local resource environment + i.e. hab or habitat. Includes dependency injection of database, keystore, + configuration file as well as Kevery and key store Manager.. + + Attributes: (Injected) + ks (keeping.Keeper): lmdb key store + db (basing.Baser): lmdb data base for KEL etc + cf (configing.Configer): config file instance + mgr (keeping.Manager): creates and rotates keys in key store + rtr (routing.Router): routes reply 'rpy' messages + rvy (routing.Revery): factory that processes reply 'rpy' messages + kvy (eventing.Kevery): factory for local processing of local event msgs + psr (parsing.Parser): parses local messages for .kvy .rvy + + + Attributes: + name (str): alias of controller + pre (str): qb64 prefix of own local controller or None if new + temp (bool): True means testing: + use weak level when salty algo for stretching in key creation + for incept and rotate of keys for this hab.pre + inited (bool): True means fully initialized wrt databases. + False means not yet fully initialized + delpre (str | None): delegator prefix if any else None + + Properties: + kever (Kever): instance of key state of local controller + kevers (dict): of eventing.Kever instances from KELs in local db + keyed by qb64 prefix. Read through cache of of kevers of states for + KELs in db.states + iserder (serdering.SerderKERI): own inception event + prefixes (OrderedSet): local prefixes for .db + accepted (bool): True means accepted into local KEL. + False otherwise + + """ def __init__(self, ks, db, cf, mgr, rtr, rvy, kvy, psr, *, name='test', ns=None, pre=None, temp=False): + """ + Initialize instance. + + Injected Parameters: (injected dependencies) + ks (keeping.Keeper): lmdb key store + db (basing.Baser): lmdb data base for KEL etc + cf (configing.Configer): config file instance + mgr (keeping.Manager): creates and rotates keys in key store + rtr (routing.Router): routes reply 'rpy' messages + rvy (routing.Revery): factory that processes reply 'rpy' messages + kvy (eventing.Kevery): factory for local processing of local event msgs + psr (parsing.Parser): parses local messages for .kvy .rvy + + + Parameters: + name (str): alias name for local controller of habitat + pre (str | None): qb64 identifier prefix of own local controller else None + temp (bool): True means testing: + use weak level when salty algo for stretching in key creation + for incept and rotate of keys for this hab.pre + """ self.db = db # injected self.ks = ks # injected self.cf = cf # injected @@ -934,7 +992,34 @@ def __init__(self, ks, db, cf, mgr, rtr, rvy, kvy, psr, *, self.delpre = None # assigned laster if delegated def make(self, DnD, code, data, delpre, estOnly, isith, verfers, nsith, digers, toad, wits): + """ + Creates Serder of inception event for provided parameters. + Assumes injected dependencies were already setup. + Parameters: + isith (int | str | list | None): incepting signing threshold as + int, str hex, or list weighted if any, otherwise compute + default from verfers + code (str): prefix derivation code default Blake3 + nsith (int, str, list | None ): next signing threshold as int, + str hex or list weighted if any, otherwise compute default from + digers + verfers (list[Verfer]): Verfer instances for initial signing keys + digers (list[Diger] | None) Diger instances for next key digests + toad (int |str| None): int or str hex of witness threshold if + specified else compute default based on number of wits (backers) + wits (list | None): qb64 prefixes of witnesses if any + delpre (str | None): qb64 of delegator identifier prefix if any + estOnly (bool | None): True means add trait eventing.TraitDex.EstOnly + which means only establishment events allowed in KEL for this Hab + False (default) means allows non-est events and no trait is added. + DnD (bool): True means add trait of eventing.TraitDex.DnD which + means do not allow delegated identifiers from this identifier + False (default) means do allow and no trait is added. + + data (list | None): seal dicts + + """ icount = len(verfers) ncount = len(digers) if digers is not None else 0 if isith is None: # compute default @@ -983,6 +1068,45 @@ def save(self, habord): val=self.pre) def reconfigure(self): + """Apply configuration from config file managed by .cf. to this Hab. + Assumes that .pre and signing keys have been setup in order to create + own endpoint auth when provided in .cf. + + config file json or hjon + + { + "dt": "2021-01-01T00:00:00.000000+00:00", + "nel": + { + "dt": "2021-01-01T00:00:00.000000+00:00", + "curls": + [ + "tcp://localhost:5621/" + ] + }, + "iurls": + [ + "tcp://localhost:5620/?role=peer&name=tam" + ], + "durls": + [ + "http://127.0.0.1:7723/oobi/EBNaNu-M9P5cgrnfl2Fvymy4E_jvxxyjb70PRtiANlJy", + "http://127.0.0.1:7723/oobi/EMhvwOlyEJ9kN4PrwCpr9Jsv7TxPhiYveZ0oP3lJzdEi", + ], + "wurls": + [ + "http://127.0.0.1:5644/.well-known/keri/oobi/EBNaNu-M9P5cgrnfl2Fvymy4E_jvxxyjb70PRtiANlJy?name=Root" + ] + } + + + + Config file is meant to be read only at init not changed by app at + run time. Any dynamic app changes must go in database not config file + that way we don't have to worry about multiple writers of cf. + Use config file to preload database not as a database. Config file may + have named sections for Habery or individual Habs as needed. + """ conf = self.cf.get() if self.name not in conf: @@ -1008,6 +1132,9 @@ def reconfigure(self): @property def iserder(self): + """ + Return serder of inception event + """ if (dig := self.db.getKeLast(eventing.snKey(pre=self.pre, sn=0))) is None: raise kering.ConfigurationError("Missing inception event in KEL for " "Habitat pre={}.".format(self.pre)) @@ -1018,6 +1145,9 @@ def iserder(self): @property def kevers(self): + """ + Returns .db.kevers + """ return self.db.kevers @property @@ -1026,17 +1156,44 @@ def accepted(self): @property def kever(self): + """ + Returns kever for its .pre + """ return self.kevers[self.pre] if self.accepted else None @property def prefixes(self): + """ + Returns .db.prefixes + """ return self.db.prefixes def incept(self, **kwa): + """Alias for .make """ self.make(**kwa) def rotate(self, *, verfers=None, digers=None, isith=None, nsith=None, toad=None, cuts=None, adds=None, data=None): + """ + Perform rotation operation. Register rotation in database. + Returns: bytearrayrotation message with attached signatures. + + Parameters: + verfers (list | None): Verfer instances of public keys qb64 + digers (list | None): Diger instances of public next key digests qb64 + isith (int |str | None): current signing threshold as int or str hex + or list of str weights + default is prior next sith + nsith (int |str | None): next, next signing threshold as int + or str hex or list of str weights + default is based on isith when None + toad (int | str | None): hex of witness threshold after cuts and adds + cuts (list | None): of qb64 pre of witnesses to be removed from witness list + adds (list | None): of qb64 pre of witnesses to be added to witness list + data (list | None): of dicts of committed data such as seals + + + """ # recall that kever.pre == self.pre kever = self.kever # before rotation kever is prior next @@ -1108,6 +1265,10 @@ def rotate(self, *, verfers=None, digers=None, isith=None, nsith=None, toad=None return msg def interact(self, *, data=None): + """ + Perform interaction operation. Register interaction in database. + Returns: bytearray interaction message with attached signatures. + """ kever = self.kever serder = eventing.interact(pre=kever.prefixer.qb64, dig=kever.serder.said, @@ -1130,6 +1291,26 @@ def interact(self, *, data=None): def sign(self, ser, verfers=None, indexed=True, indices=None, ondices=None, **kwa): + """Sign given serialization ser using appropriate keys. + Use provided verfers or .kever.verfers to lookup keys to sign. + + Parameters: + ser (bytes): serialization to sign + verfers (list[Verfer] | None): Verfer instances to get pub verifier + keys to lookup private siging keys. + verfers None means use .kever.verfers. Assumes that when group + and verfers is not None then provided verfers must be .kever.verfers + indexed (bool): When not mhab then + True means use use indexed signatures and return + list of Siger instances. + False means do not use indexed signatures and return + list of Cigar instances + indices (list[int] | None): indices (offsets) + when indexed == True. See Manager.sign + ondices (list[int | None] | None): other indices (offsets) + when indexed is True. See Manager.sign + + """ if verfers is None: verfers = self.kever.verfers # when group these provide group signing keys @@ -1141,6 +1322,18 @@ def sign(self, ser, verfers=None, indexed=True, indices=None, ondices=None, **kw def decrypt(self, ser, verfers=None, **kwa): + """Decrypt given serialization ser using appropriate keys. + Use provided verfers or .kever.verfers to lookup keys to sign. + + Parameters: + ser (str | bytes | bytearray | memoryview): serialization to decrypt + + verfers (list[Verfer] | None): Verfer instances to get pub verifier + keys to lookup and convert to private decryption keys. + verfers None means use .kever.verfers. Assumes that when group + and verfers is not None then provided verfers must be .kever.verfers + + """ if verfers is None: verfers = self.kever.verfers # when group these provide group signing keys @@ -1150,6 +1343,18 @@ def decrypt(self, ser, verfers=None, **kwa): def query(self, pre, src, query=None, **kwa): + """ Create, sign and return a `qry` message against the attester for the prefix + + Parameters: + pre (str): qb64 identifier prefix being queried for + src (str): qb64 identifier prefix of attester being queried + query (dict): addition query modifiers to include in `q` + **kwa (dict): keyword arguments passed to eventing.query + + Returns: + bytearray: signed query event + + """ query = query if query is not None else dict() query['i'] = pre @@ -1158,7 +1363,19 @@ def query(self, pre, src, query=None, **kwa): return self.endorse(serder, last=True) def endorse(self, serder, last=False, pipelined=True): + """ + Returns msg with own endorsement of msg from serder with attached signature + groups based on own pre transferable or non-transferable. + Parameters: + serder (Serder): instance of msg + last (bool): True means use SealLast. False means use SealEvent + query messages use SealLast + pipelined (bool): True means use pipelining attachment code + + Useful for endorsing message when provided via serder such as state, + reply, query or similar. + """ if self.kever.prefixer.transferable: # create SealEvent or SealLast for endorser's est evt whose keys are # used to sign @@ -1197,6 +1414,12 @@ def exchange(self, route, modifiers=None, embeds=None, save=False): + """ + Returns signed exn, message of serder with count code and receipt + couples (pre+cig) + Builds msg and then processes it into own db to validate + """ + # sign serder event serder, end = exchanging.exchange(route=route, payload=payload, @@ -1222,6 +1445,11 @@ def exchange(self, route, return msg def receipt(self, serder): + """ + Returns own receipt, rct, message of serder with count code and receipt + couples (pre+cig) + Builds msg and then processes it into own db to validate + """ ked = serder.ked reserder = eventing.receipt(pre=ked["i"], sn=int(ked["s"], 16), @@ -1245,6 +1473,17 @@ def receipt(self, serder): def witness(self, serder): + """ + Returns own receipt, rct, message of serder with count code and witness + indexed receipt signatures if key state of serder.pre shows that own pre + is a current witness of event in serder + + ToDo XXXX add parameter to force check that serder as been accepted + as valid. Otherwise must assume that before calling this that serder + being witnessed has been accepted as valid event into this hab + controller's KEL + + """ if self.kever.prefixer.transferable: # not non-transferable prefix raise ValueError("Attempt to create witness receipt with" " transferable pre={}.".format(self.pre)) @@ -1276,6 +1515,16 @@ def witness(self, serder): def replay(self, pre=None, fn=0): + """ + Returns replay of FEL first seen event log for pre starting from fn + Default pre is own .pre + + Parameters: + pre is qb64 str or bytes of identifier prefix. + default is own .pre + fn is int first seen ordering number + + """ if not pre: pre = self.pre @@ -1290,12 +1539,27 @@ def replay(self, pre=None, fn=0): return msgs def replayAll(self): + """ + Returns replay of FEL first seen event log for all pre starting at key + + Parameters: + key (bytes): fnKey(pre, fn) + + """ msgs = bytearray() for msg in self.db.cloneAllPreIter(): msgs.extend(msg) return msgs def makeOtherEvent(self, pre, sn): + """ + Returns: messagized bytearray message with attached signatures of + own event at sequence number sn from retrieving event at sn + and associated signatures from database. + + Parameters: + sn is int sequence number of event + """ if pre not in self.kevers: return None @@ -1314,33 +1578,103 @@ def makeOtherEvent(self, pre, sn): return msg def fetchEnd(self, cid: str, role: str, eid: str): + """ + Returns: + endpoint (basing.EndpointRecord): instance or None + """ return self.db.ends.get(keys=(cid, role, eid)) def fetchLoc(self, eid: str, scheme: str = kering.Schemes.http): + """ + Returns: + location (basing.LocationRecord): instance or None + """ return self.db.locs.get(keys=(eid, scheme)) def fetchEndAllowed(self, cid: str, role: str, eid: str): + """ + Returns: + allowed (bool): True if eid is allowed as endpoint provider for cid + in role. False otherwise. + Parameters: + cid (str): identifier prefix qb64 of controller authZ endpoint provided + eid in role + role (str): endpoint role such as (controller, witness, watcher, etc) + eid (str): identifier prefix qb64 of endpoint provider in role + """ end = self.db.ends.get(keys=(cid, role, eid)) return end.allowed if end else None def fetchEndEnabled(self, cid: str, role: str, eid: str): + """ + Returns: + allowed (bool): True if eid is allowed as endpoint provider for cid + in role. False otherwise. + Parameters: + cid (str): identifier prefix qb64 of controller authZ endpoint provided + eid in role + role (str): endpoint role such as (controller, witness, watcher, etc) + eid (str): identifier prefix qb64 of endpoint provider in role + """ end = self.db.ends.get(keys=(cid, role, eid)) return end.enabled if end else None def fetchEndAuthzed(self, cid: str, role: str, eid: str): + """ + Returns: + allowed (bool): True if eid is allowed as endpoint provider for cid + in role. False otherwise. + Parameters: + cid (str): identifier prefix qb64 of controller authZ endpoint provided + eid in role + role (str): endpoint role such as (controller, witness, watcher, etc) + eid (str): identifier prefix qb64 of endpoint provider in role + """ end = self.db.ends.get(keys=(cid, role, eid)) return (end.enabled or end.allowed) if end else None def fetchUrl(self, eid: str, scheme: str = kering.Schemes.http): + """ + Returns: + url (str): for endpoint provider given by eid + empty string when url is nullified + None when no location record + """ loc = self.db.locs.get(keys=(eid, scheme)) return loc.url if loc else loc def fetchUrls(self, eid: str, scheme: str = ""): + """ + Returns: + surls (hicting.Mict): urls keyed by scheme for given eid. Assumes that + user independently verifies that the eid is allowed for a + given cid and role. If url is empty then does not return + + Parameters: + eid (str): identifier prefix qb64 of endpoint provider + scheme (str): url scheme + """ return hicting.Mict([(keys[1], loc.url) for keys, loc in self.db.locs.getItemIter(keys=(eid, scheme)) if loc.url]) def fetchRoleUrls(self, cid: str, *, role: str = "", scheme: str = "", eids=None, enabled: bool = True, allowed: bool = True): + """ + Returns: + rurls (hicting.Mict): of nested dicts. The top level dict rurls is keyed by + role for a given cid. Each value in rurls is eurls dict + keyed by the eid of authorized endpoint provider and + each value in eurls is a surls dict keyed by scheme + + Parameters: + cid (str): identifier prefix qb64 of controller authZ endpoint provided + eid in role + role (str): endpoint role such as (controller, witness, watcher, etc) + scheme (str): url scheme + eids (list): when provided restrict returns to only eids in eids + enabled (bool): True means fetch any allowed witnesses as well + allowed (bool): True means fetech any enabled witnesses as well + """ if eids is None: eids = [] @@ -1366,6 +1700,24 @@ def fetchRoleUrls(self, cid: str, *, role: str = "", scheme: str = "", def fetchWitnessUrls(self, cid: str, scheme: str = "", eids=None, enabled: bool = True, allowed: bool = True): + """ + Fetch witness urls for witnesses of cid at latest key state or enabled or + allowed witnesses if not a witness at latest key state. + + Returns: + rurls (hicting.Mict): of nested dicts. The top level dict rurls is keyed by + role for a given cid. Each value in rurls is eurls dict + dict keyed by the eid of authorized endpoint provider and + each value in eurls is a surls dict keyed by scheme + + Parameters: + cid (str): identifier prefix qb64 of controller authZ endpoint provided + eid is witness + scheme (str): url scheme + eids (list): when provided restrict returns to only eids in eids + enabled (bool): True means fetch any allowed witnesses as well + allowed (bool): True means fetech any enabled witnesses as well + """ return (self.fetchRoleUrls(cid=cid, role=kering.Roles.witness, scheme=scheme, @@ -1374,6 +1726,15 @@ def fetchWitnessUrls(self, cid: str, scheme: str = "", eids=None, allowed=allowed)) def endsFor(self, pre): + """ Load Authroized endpoints for provided AID + + Args: + pre (str): qb64 aid for which to load ends + + Returns: + dict: nest dict of Roles -> eid -> Schemes -> endpoints + + """ ends = dict() for (_, erole, eid), end in self.db.ends.getItemIter(keys=(pre,)): @@ -1403,9 +1764,34 @@ def endsFor(self, pre): return ends def reply(self, **kwa): + """ + Returns: + msg (bytearray): reply message + + Parameters: + route is route path string that indicates data flow handler (behavior) + to processs the reply + data is list of dicts of comitted data such as seals + dts is date-time-stamp of message at time or creation + version is Version instance + kind is serialization kind + """ return self.endorse(eventing.reply(**kwa)) def makeEndRole(self, eid, role=kering.Roles.controller, allow=True, stamp=None): + """ + Returns: + msg (bytearray): reply message allowing/disallowing endpoint provider + eid in role + + Parameters: + eid (str): qb64 of endpoint provider to be authorized + role (str): authorized role for eid + allow (bool): True means add eid at role as authorized + False means cut eid at role as unauthorized + stamp (str): date-time-stamp RFC-3339 profile of iso8601 datetime. + None means use now. + """ data = dict(cid=self.pre, role=role, eid=eid) route = "/end/role/add" if allow else "/end/role/cut" return self.reply(route=route, data=data, stamp=stamp) @@ -1442,11 +1828,43 @@ def loadEndRole(self, cid, eid, role=kering.Roles.controller): return msgs def makeLocScheme(self, url, eid=None, scheme="http", stamp=None): + """ + Returns: + msg (bytearray): reply message of own url service endpoint at scheme + + Parameters: + url (str): url of endpoint, may have scheme missing or not + If url is empty then nullifies location + eid (str): qb64 of endpoint provider to be authorized + scheme (str): url scheme must matche scheme in url if any + stamp (str): date-time-stamp RFC-3339 profile of iso8601 datetime. + None means use now. + + """ eid = eid if eid is not None else self.pre data = dict(eid=eid, scheme=scheme, url=url) return self.reply(route="/loc/scheme", data=data, stamp=stamp) def replyLocScheme(self, eid, scheme=""): + """ + Returns a reply message stream composed of entries authed by the given + eid from the appropriate reply database including associated attachments + in order to disseminate (percolate) BADA reply data authentication proofs. + + Currently uses promiscuous model for permitting endpoint discovery. + Future is to use identity constraint graph to constrain discovery + of whom by whom. + + eid and not scheme then: + loc url for all schemes at eid + + eid and scheme then: + loc url for scheme at eid + + Parameters: + eid (str): endpoint provider id + scheme (str): url scheme + """ msgs = bytearray() urls = self.fetchUrls(eid=eid, scheme=scheme) @@ -1486,6 +1904,40 @@ def loadLocScheme(self, eid, scheme=None): return msgs def replyEndRole(self, cid, role=None, eids=None, scheme=""): + + """ + Returns a reply message stream composed of entries authed by the given + cid from the appropriate reply database including associated attachments + in order to disseminate (percolate) BADA reply data authentication proofs. + + Currently uses promiscuous model for permitting endpoint discovery. + Future is to use identity constraint graph to constrain discovery + of whom by whom. + + cid and not role and not scheme then: + end authz for all eids in all roles and loc url for all schemes at each eid + if eids then only eids in eids else all eids + + cid and not role and scheme then: + end authz for all eid in all roles and loc url for scheme at each eid + if eids then only eids in eids else all eids + + cid and role and not scheme then: + end authz for all eid in role and loc url for all schemes at each eid + if eids then only eids in eids else all eids + + cid and role and scheme then: + end authz for all eid in role and loc url for scheme at each eid + if eids then only eids in eids else all eids + + + Parameters: + cid (str): identifier prefix qb64 of controller authZ endpoint provided + eid is witness + role (str): authorized role for eid + eids (list): when provided restrict returns to only eids in eids + scheme (str): url scheme + """ msgs = bytearray() if eids is None: @@ -1518,9 +1970,39 @@ def replyEndRole(self, cid, role=None, eids=None, scheme=""): return msgs def replyToOobi(self, aid, role, eids=None): + """ + Returns a reply message stream composed of entries authed by the given + aid from the appropriate reply database including associated attachments + in order to disseminate (percolate) BADA reply data authentication proofs. + + Currently uses promiscuous model for permitting oobi initiated endpoint + discovery. Future is to use identity constraint graph to constrain + discovery of whom by whom. + + This method is entry point for initiating replies generated by + .replyEndRole and/or .replyLocScheme + + Parameters: + aid (str): qb64 of identifier in oobi, may be cid or eid + role (str): authorized role for eid + eids (list): when provided restrict returns to only eids in eids + + """ + # default logic is that if self.pre is witness of aid and has a loc url + # for self then reply with loc scheme for all witnesses even if self + # not permiteed in .habs.oobis return self.replyEndRole(cid=aid, role=role, eids=eids) def getOwnEvent(self, sn, allowPartiallySigned=False): + """ + Returns: message Serder and controller signatures of + own event at sequence number sn from retrieving event at sn + and associated signatures from database. + + Parameters: + sn (int): is int sequence number of event + allowPartiallySigned(bool): True means attempt to load from partial signed escrow + """ key = dbing.snKey(self.pre, sn) dig = self.db.getKeLast(key) if dig is None and allowPartiallySigned: @@ -1543,6 +2025,16 @@ def getOwnEvent(self, sn, allowPartiallySigned=False): return serder, sigs, couple def makeOwnEvent(self, sn, allowPartiallySigned=False): + """ + Returns: messagized bytearray message with attached signatures of + own event at sequence number sn from retrieving event at sn + and associated signatures from database. + + Parameters: + sn(int): is int sequence number of event + allowPartiallySigned(bool): True means attempt to load from partial signed escrow + + """ msg = bytearray() serder, sigs, couple = self.getOwnEvent(sn=sn, allowPartiallySigned=allowPartiallySigned) @@ -1560,15 +2052,33 @@ def makeOwnEvent(self, sn, allowPartiallySigned=False): return msg def makeOwnInception(self, allowPartiallySigned=False): + """ + Returns: messagized bytearray message with attached signatures of + own inception event by retrieving event and signatures + from database. + """ return self.makeOwnEvent(sn=0, allowPartiallySigned=allowPartiallySigned) def processCues(self, cues): + """ + Returns bytearray of messages as a result of processing all cues + + Parameters: + cues is deque of cues + """ msgs = bytearray() # outgoing messages for msg in self.processCuesIter(cues): msgs.extend(msg) return msgs def processCuesIter(self, cues): + """ + Iterate through cues and yields one or more msgs for each cue. + + Parameters: + cues is deque of cues + + """ while cues: # iteratively process each cue in cues msgs = bytearray() cue = cues.pull() # cues.popleft() @@ -1611,6 +2121,20 @@ def processCuesIter(self, cues): msg = self.reply(data=data, route=route) yield msg + # ToDo XXXX cue for kin = "query" various types of queries + # (query witness, query delegation etc) + # ToDo XXXX cue for kin = "notice" new event + # ToDo XXXX cue for kin = "witness" to create witness receipt own is witness + # ToDo XXXX cue for kin = "noticeBadCloneFN" + # ToDo XXXX cue for kin = "approveDelegation" own is delegator + + # ToDo XXXX cue for kin = "keyStateSaved" + # ToDo XXXX cue for kin = "psUnescrow" + # ToDo XXXX cue for kin = "stream" + # ToDo XXXX cue for kin = "invalid" + # ToDo XXXX cue for kin=""remoteMemberedSig"" + + def witnesser(self): return True From 80821e68301ab44bc1584cc839caf95544c1e8da Mon Sep 17 00:00:00 2001 From: arilieb Date: Tue, 19 Aug 2025 08:03:05 -0700 Subject: [PATCH 5/6] Added KramError type and lifted test method out to fixture. --- src/keri/core/kraming.py | 8 ++++---- src/keri/db/basing.py | 2 +- src/keri/kering.py | 6 ++++++ tests/conftest.py | 30 ++++++++++++++++++++++++++++++ tests/core/test_kraming.py | 28 ++++------------------------ 5 files changed, 45 insertions(+), 29 deletions(-) diff --git a/src/keri/core/kraming.py b/src/keri/core/kraming.py index 5d238837e..bf4a0f9a7 100644 --- a/src/keri/core/kraming.py +++ b/src/keri/core/kraming.py @@ -159,7 +159,7 @@ def checkMessageTimeliness(self, serder): if (messageTime < currentTime - driftSkew - windowSize or messageTime > currentTime + driftSkew): - raise kering.ValidationError(f"Message is out of time window {serder.pretty()}") + raise kering.KramError(f"Message is out of time window {serder.pretty()}") cacheKey = self._constructCacheKey(serder) @@ -176,9 +176,9 @@ def checkMessageTimeliness(self, serder): return True if messageTime == cachedTimestamp: - raise kering.ValidationError(f"Message replay detected {serder.pretty()}") + raise kering.KramError(f"Message replay detected {serder.pretty()}") - raise kering.ValidationError(f"Message is out of order {serder.pretty()}") + raise kering.KramError(f"Message is out of order {serder.pretty()}") def pruneCache(self): """Prune stale entries from the Replay Cache Table. @@ -213,7 +213,7 @@ def processKrms(self): # TODO: Implement escrowing functionality self.db.krms.rem(said) logger.info(f"Message accepted: {serder.pretty()}") - except kering.ValidationError as e: + except kering.KramError as e: logger.error(f"Invalid message: {e}") self.pruneCache() diff --git a/src/keri/db/basing.py b/src/keri/db/basing.py index 1f92111ac..220d9916a 100644 --- a/src/keri/db/basing.py +++ b/src/keri/db/basing.py @@ -1315,7 +1315,7 @@ def reopen(self, **kwa): sep=">", ) - # Timeliness cache of identifiers (soon to also include message types) + # KRAM escrow database, not yet implemented self.krms = subing.IoSetSuber(db=self, subkey='krms.', sep=">") self.reload() diff --git a/src/keri/kering.py b/src/keri/kering.py index ff1a7cb2c..68268d8ba 100644 --- a/src/keri/kering.py +++ b/src/keri/kering.py @@ -1041,3 +1041,9 @@ class QueryNotFoundError(KeriError): raise QueryNotFoundError("error message") """ +class KramError(KeriError): + """ + Timestamp indicating replay attack or other KRAM related error + Usage: + raise KramError("error message") + """ diff --git a/tests/conftest.py b/tests/conftest.py index 4013464ae..4f8f85932 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -82,6 +82,36 @@ def mockRandomSalt(self): def seeder(): return DbSeed +@pytest.fixture +def create_test_serder(): + """Fixture that provides a helper function to create test serders""" + + def _create_test_serder(sourceAid, ilk, timestamp=None, route=None, qBlock=None, routeParams=None): + """Helper to create a test serder with specified parameters""" + from keri.core import serdering + from keri.help import helping + + if timestamp is None: + timestamp = helping.nowIso8601() + + sad = { + "v": "KERI10JSON00011c_", + "t": ilk, + "i": sourceAid, + "dt": timestamp, + } + + if route: + if routeParams: + route += "?" + "&".join([f"{k}={v}" for k, v in routeParams.items()]) + sad["r"] = route + + if qBlock: + sad["q"] = qBlock + + return serdering.SerderKERI(sad=sad, makify=True) + + return _create_test_serder class DbSeed: @staticmethod diff --git a/tests/core/test_kraming.py b/tests/core/test_kraming.py index ddc039908..072cf5d37 100644 --- a/tests/core/test_kraming.py +++ b/tests/core/test_kraming.py @@ -5,7 +5,7 @@ from keri import kering -def test_timeliness(monkeypatch): +def test_timeliness(monkeypatch, create_test_serder): def mockNowIso8601(): return "2021-06-27T21:26:21.233257+00:00" @@ -22,31 +22,11 @@ def mockNowIso8601(): # Set window parameters for the test AIDs tc.setWindowParameters(hab.pre, windowSize=1.0, driftSkew=1.0) - def create_test_serder(ilk, timestamp=None, route=None, sourceAid=hab.pre, qBlock=None, routeParams=None): - """Helper to create a test serder with specified parameters""" - if timestamp is None: - timestamp = helping.nowIso8601() - - sad = { - "v": "KERI10JSON00011c_", - "t": ilk, - "i": sourceAid, - "dt": timestamp, - } - - if route: - if routeParams: - route += "?" + "&".join([f"{k}={v}" for k, v in routeParams.items()]) - sad["r"] = route - - if qBlock: - sad["q"] = qBlock - - return serdering.SerderKERI(sad=sad, makify=True) current_time = helping.nowIso8601() offerSerder = create_test_serder( + sourceAid=hab.pre, ilk="exn", timestamp=current_time, route="/credential/offer", @@ -61,8 +41,8 @@ def mockNowIso8601Later(): monkeypatch.setattr(helping, "nowIso8601", mockNowIso8601Later) - offerSerder2 = create_test_serder(ilk="exn", timestamp=mockNowIso8601Later(), route="/credential/offer", - sourceAid="BGKVzj4ve0VSd8z_AmvhLg4lqcC_9WYX90k03q-R_Ydo") + offerSerder2 = create_test_serder(sourceAid="BGKVzj4ve0VSd8z_AmvhLg4lqcC_9WYX90k03q-R_Ydo", + ilk="exn", timestamp=mockNowIso8601Later(), route="/credential/offer") # Cache new entry isValid = tc.checkMessageTimeliness(offerSerder2) From a7ef337f675d1dc35896840a24222febdfddaee9 Mon Sep 17 00:00:00 2001 From: arilieb Date: Tue, 19 Aug 2025 09:00:48 -0700 Subject: [PATCH 6/6] Fixed exception type expectation in test_kraming.py. --- tests/core/test_kraming.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/core/test_kraming.py b/tests/core/test_kraming.py index 072cf5d37..80fe1a735 100644 --- a/tests/core/test_kraming.py +++ b/tests/core/test_kraming.py @@ -37,7 +37,7 @@ def mockNowIso8601(): assert isValid def mockNowIso8601Later(): - return "2021-06-27T21:26:23.233258+00:00" + return "2021-06-27T21:26:24.233258+00:00" monkeypatch.setattr(helping, "nowIso8601", mockNowIso8601Later) @@ -49,7 +49,7 @@ def mockNowIso8601Later(): assert isValid # Attempt to cache first entry again - this should now raise ValidationError - with pytest.raises(kering.ValidationError): + with pytest.raises(kering.KramError): tc.checkMessageTimeliness(offerSerder) # Prune only the entry with a time outside the window