diff --git a/nats/js/object_store.py b/nats/js/object_store.py index 441e72d3..caa47710 100644 --- a/nats/js/object_store.py +++ b/nats/js/object_store.py @@ -495,8 +495,12 @@ async def watch_updates(msg): await watcher._updates.put(None) deliver_policy = None - if not include_history: + if include_history: deliver_policy = api.DeliverPolicy.LAST_PER_SUBJECT + else: + # Aligning with nats.js on this one, nats.go uses ALL if history is not included + # But if history is not desired the watch should only be giving notifications on new entries + deliver_policy = api.DeliverPolicy.NEW watcher._sub = await self._js.subscribe( all_meta, @@ -554,18 +558,37 @@ async def list( """ list will list all the objects in this store. """ - watcher = await self.watch(ignore_deletes=ignore_deletes) + all_meta = OBJ_ALL_META_PRE_TEMPLATE.format(bucket=self._name) entries = [] - async for entry in watcher: - # None entry is used to signal that there is no more info. - if not entry: - break - entries.append(entry) + try: + # Use a consumer to get all messages + sub = await self._js.subscribe( + all_meta, + ordered_consumer=True, + deliver_policy=api.DeliverPolicy.LAST_PER_SUBJECT, + ) - await watcher.stop() + while True: + try: + msg = await asyncio.wait_for(sub.next_msg(), timeout=1.0) + try: + info = api.ObjectInfo.from_response( + json.loads(msg.data) + ) + if (not ignore_deletes) or (not info.deleted): + entries.append(info) + except Exception: + # Skip invalid messages + continue + except asyncio.TimeoutError: + # No more messages available + break + except Exception as e: + raise e - if not entries: - raise NotFoundError + await sub.unsubscribe() + except Exception as e: + raise e return entries diff --git a/tests/test_js.py b/tests/test_js.py index 85f8f556..b0b5b99d 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -3912,6 +3912,75 @@ async def error_handler(e): await nc.close() + @async_test + async def test_object_watch_include_history(self): + errors = [] + + async def error_handler(e): + print("Error:", e, type(e)) + errors.append(e) + + nc = await nats.connect(error_cb=error_handler) + js = nc.jetstream() + + obs = await js.create_object_store( + "TEST_FILES", + config=nats.js.api.ObjectStoreConfig(description="multi_files"), + ) + + # Put objects before starting watcher + await obs.put("A", b"A") + await obs.put("B", b"B") + await obs.put("C", b"C") + + # ------------------------------------ + # Case 1: Watcher with include_history=True + # ------------------------------------ + watcher = await obs.watch(include_history=True) + + # Should receive historical updates immediately + e = await watcher.updates() + assert e.name == "A" + assert e.bucket == "TEST_FILES" + + e = await watcher.updates() + assert e.name == "B" + + e = await watcher.updates() + assert e.name == "C" + + e = await watcher.updates() + assert e is None + + # No new updates yet, expect timeout + with pytest.raises(asyncio.TimeoutError): + await watcher.updates(timeout=1) + + # ------------------------------------ + # Case 2: Watcher with include_history=False + # ------------------------------------ + watcher_no_history = await obs.watch(include_history=False) + + # Should receive no updates immediately + with pytest.raises(asyncio.TimeoutError): + await watcher_no_history.updates(timeout=1) + + # Add a new object after starting the watcher + await obs.put("D", b"D") + + # Now the watcher should see this update + e = await watcher_no_history.updates() + assert e.name == "D" + + e = await watcher_no_history.updates() + assert e is None + + # No further updates expected + with pytest.raises(asyncio.TimeoutError): + await watcher_no_history.updates(timeout=1) + + await nc.close() + @async_test async def test_object_list(self): errors = []