Skip to content

Commit

Permalink
Update sequence number querying for logs to allow the specification o…
Browse files Browse the repository at this point in the history
…f fn for first seen ordinal as the first event to start streaming back. This is an optimization to allow witnesses and watchers to only stream the events needed and not the entire KEL every time a new event occurs. (WebOfTrust#848)

Signed-off-by: pfeairheller <[email protected]>
  • Loading branch information
pfeairheller authored Aug 20, 2024
1 parent 970d2fb commit a69e7c4
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 8 deletions.
5 changes: 3 additions & 2 deletions src/keri/app/agenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ def msgDo(self, tymth=None, tock=1.0, **opts):

yield self.tock

def query(self, pre, r="logs", sn='0', src=None, hab=None, anchor=None, wits=None, **kwa):
def query(self, pre, r="logs", sn='0', fn='0', src=None, hab=None, anchor=None, wits=None, **kwa):
""" Create, sign and return a `qry` message against the attester for the prefix
Parameters:
Expand All @@ -534,14 +534,15 @@ def query(self, pre, r="logs", sn='0', src=None, hab=None, anchor=None, wits=Non
pre (str): qb64 identifier prefix being queried for
r (str): query route
sn (str): optional specific hex str of sequence number to query for
fn (str): optional specific hex str of sequence number to start with
anchor (Seal): anchored Seal to search for
wits (list) witnesses to query
Returns:
bytearray: signed query event
"""
qry = dict(s=sn)
qry = dict(s=sn, fn=fn)
if anchor is not None:
qry["a"] = anchor

Expand Down
8 changes: 6 additions & 2 deletions src/keri/app/querying.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,17 @@ def recur(self, tyme, deeds=None):

class SeqNoQuerier(doing.DoDoer):

def __init__(self, hby, hab, pre, sn, wits=None, **opts):
def __init__(self, hby, hab, pre, sn, fn=None, wits=None, **opts):
self.hby = hby
self.hab = hab
self.pre = pre
self.sn = sn
self.fn = fn if fn is not None else 0
self.witq = agenting.WitnessInquisitor(hby=self.hby)
self.witq.query(src=self.hab.pre, pre=self.pre, sn="{:x}".format(self.sn), wits=wits)
self.witq.query(src=self.hab.pre, pre=self.pre,
sn="{:x}".format(self.sn),
fn="{:x}".format(self.fn),
wits=wits)
super(SeqNoQuerier, self).__init__(doers=[self.witq], **opts)

def recur(self, tyme, deeds=None):
Expand Down
2 changes: 1 addition & 1 deletion src/keri/app/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def adjudicate(self, watched, toad=None):
for watcher in watchers:
saider = self.hab.db.knas.get(keys=(watched, watcher))
if saider is None:
print(f"No key state from watcher {watcher} for {watched}")
logger.info(f"No key state from watcher {watcher} for {watched}")
continue

ksn = self.hab.db.ksns.get(keys=(saider.qb64,))
Expand Down
3 changes: 2 additions & 1 deletion src/keri/core/eventing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4595,6 +4595,7 @@ def processQuery(self, serder, source=None, sigers=None, cigars=None):
src = qry["src"]
anchor = qry["a"] if "a" in qry else None
sn = int(qry["s"], 16) if "s" in qry else None
fn = int(qry["fn"], 16) if "fn" in qry else 0

if pre not in self.kevers:
self.escrowQueryNotFoundEvent(serder=serder, prefixer=source, sigers=sigers, cigars=cigars)
Expand All @@ -4612,7 +4613,7 @@ def processQuery(self, serder, source=None, sigers=None, cigars=None):
raise QueryNotFoundError("Query not found error={}.".format(ked))

msgs = list() # outgoing messages
for msg in self.db.clonePreIter(pre=pre, fn=0):
for msg in self.db.clonePreIter(pre=pre, fn=fn):
msgs.append(msg)

if kever.delpre:
Expand Down
11 changes: 9 additions & 2 deletions tests/app/test_querying.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_querying():
assert msg["src"] == inqHab.pre
assert msg["pre"] == subHab.pre
assert msg["r"] == "ksn"
assert msg["q"] == {'s': '0'}
assert msg["q"] == {'fn': '0', 's': '0'}
assert msg["wits"] is None

doist.recur(deeds=deeds)
Expand Down Expand Up @@ -108,6 +108,12 @@ def test_querying():
doist.recur(deeds=deeds)
assert len(sdoer.witq.msgs) == 1

sdoer = SeqNoQuerier(hby=hby, hab=inqHab, pre=subHab.pre, fn=2, sn=4)
assert len(sdoer.witq.msgs) == 1
msg = sdoer.witq.msgs.pull()
query = msg['q']
assert query == {'fn': '2', 's': '4'}

# Test with originally unknown AID
sdoer = SeqNoQuerier(hby=hby, hab=inqHab, pre="ExxCHAI9bkl50F5SCKl2AWQbFGKeJtz0uxM2diTMxMQA", sn=1)
assert len(sdoer.witq.msgs) == 1
Expand All @@ -131,7 +137,8 @@ def test_querying():
assert len(sdoer.witq.msgs) == 1

# Test with originally unknown AID
adoer = AnchorQuerier(hby=hby, hab=inqHab, pre="ExxCHAI9bkl50F5SCKl2AWQbFGKeJtz0uxM2diTMxMQA", anchor={'s': '5'})
adoer = AnchorQuerier(hby=hby, hab=inqHab, pre="ExxCHAI9bkl50F5SCKl2AWQbFGKeJtz0uxM2diTMxMQA",
anchor={'s': '5'})
assert len(adoer.witq.msgs) == 1

tock = 0.03125
Expand Down
6 changes: 6 additions & 0 deletions tests/core/test_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ def test_replay():
b'OOJVAAAjEg-3N4cNT_yot5wWlcKaz-1xPAgteGCsYZhq9dax3sQPD5HFI7M13Bhp'
b'kRttBEq92pAaIG')

assert debHab.kever.sn == 6
msgs = next(debHab.db.clonePreIter(debHab.pre, fn=4))
serder = serdering.SerderKERI(raw=msgs)
assert serder.ilk == kering.Ilks.ixn
assert serder.sn == 4

# Play debMsgs to Cam
# create non-local kevery for Cam to process msgs from Deb
camKevery = eventing.Kevery(db=camHab.db,
Expand Down

0 comments on commit a69e7c4

Please sign in to comment.