Skip to content

Commit 3edc9b8

Browse files
committed
Fix object store watch to use NEW rather than ALL
1 parent 03ae02b commit 3edc9b8

File tree

2 files changed

+102
-10
lines changed

2 files changed

+102
-10
lines changed

nats/js/object_store.py

+33-10
Original file line numberDiff line numberDiff line change
@@ -495,8 +495,12 @@ async def watch_updates(msg):
495495
await watcher._updates.put(None)
496496

497497
deliver_policy = None
498-
if not include_history:
498+
if include_history:
499499
deliver_policy = api.DeliverPolicy.LAST_PER_SUBJECT
500+
else:
501+
# Aligning with nats.js on this one, nats.go uses ALL if history is not included
502+
# But if history is not desired the watch should only be giving notifications on new entries
503+
deliver_policy = api.DeliverPolicy.NEW
500504

501505
watcher._sub = await self._js.subscribe(
502506
all_meta,
@@ -554,18 +558,37 @@ async def list(
554558
"""
555559
list will list all the objects in this store.
556560
"""
557-
watcher = await self.watch(ignore_deletes=ignore_deletes)
561+
all_meta = OBJ_ALL_META_PRE_TEMPLATE.format(bucket=self._name)
558562
entries = []
559563

560-
async for entry in watcher:
561-
# None entry is used to signal that there is no more info.
562-
if not entry:
563-
break
564-
entries.append(entry)
564+
try:
565+
# Use a consumer to get all messages
566+
sub = await self._js.subscribe(
567+
all_meta,
568+
ordered_consumer=True,
569+
deliver_policy=api.DeliverPolicy.LAST_PER_SUBJECT,
570+
)
565571

566-
await watcher.stop()
572+
while True:
573+
try:
574+
msg = await asyncio.wait_for(sub.next_msg(), timeout=1.0)
575+
try:
576+
info = api.ObjectInfo.from_response(
577+
json.loads(msg.data)
578+
)
579+
if (not ignore_deletes) or (not info.deleted):
580+
entries.append(info)
581+
except Exception:
582+
# Skip invalid messages
583+
continue
584+
except asyncio.TimeoutError:
585+
# No more messages available
586+
break
587+
except Exception as e:
588+
raise e
567589

568-
if not entries:
569-
raise NotFoundError
590+
await sub.unsubscribe()
591+
except Exception as e:
592+
raise e
570593

571594
return entries

tests/test_js.py

+69
Original file line numberDiff line numberDiff line change
@@ -3912,6 +3912,75 @@ async def error_handler(e):
39123912

39133913
await nc.close()
39143914

3915+
@async_test
3916+
async def test_object_watch_include_history(self):
3917+
errors = []
3918+
3919+
async def error_handler(e):
3920+
print("Error:", e, type(e))
3921+
errors.append(e)
3922+
3923+
nc = await nats.connect(error_cb=error_handler)
3924+
js = nc.jetstream()
3925+
3926+
obs = await js.create_object_store(
3927+
"TEST_FILES",
3928+
config=nats.js.api.ObjectStoreConfig(description="multi_files"),
3929+
)
3930+
3931+
# Put objects before starting watcher
3932+
await obs.put("A", b"A")
3933+
await obs.put("B", b"B")
3934+
await obs.put("C", b"C")
3935+
3936+
# ------------------------------------
3937+
# Case 1: Watcher with include_history=True
3938+
# ------------------------------------
3939+
watcher = await obs.watch(include_history=True)
3940+
3941+
# Should receive historical updates immediately
3942+
e = await watcher.updates()
3943+
assert e.name == "A"
3944+
assert e.bucket == "TEST_FILES"
3945+
3946+
e = await watcher.updates()
3947+
assert e.name == "B"
3948+
3949+
e = await watcher.updates()
3950+
assert e.name == "C"
3951+
3952+
e = await watcher.updates()
3953+
assert e is None
3954+
3955+
# No new updates yet, expect timeout
3956+
with pytest.raises(asyncio.TimeoutError):
3957+
await watcher.updates(timeout=1)
3958+
3959+
# ------------------------------------
3960+
# Case 2: Watcher with include_history=False
3961+
# ------------------------------------
3962+
watcher_no_history = await obs.watch(include_history=False)
3963+
3964+
# Should receive no updates immediately
3965+
with pytest.raises(asyncio.TimeoutError):
3966+
await watcher_no_history.updates(timeout=1)
3967+
3968+
# Add a new object after starting the watcher
3969+
await obs.put("D", b"D")
3970+
3971+
# Now the watcher should see this update
3972+
e = await watcher_no_history.updates()
3973+
assert e.name == "D"
3974+
3975+
e = await watcher_no_history.updates()
3976+
assert e is None
3977+
3978+
# No further updates expected
3979+
with pytest.raises(asyncio.TimeoutError):
3980+
await watcher_no_history.updates(timeout=1)
3981+
3982+
await nc.close()
3983+
39153984
@async_test
39163985
async def test_object_list(self):
39173986
errors = []

0 commit comments

Comments
 (0)