Skip to content

Commit 65d96b1

Browse files
authored
Merge pull request #346 from ydb-platform/topic-api-public
topic reader: add to public TopicReaderBatch, wait_message
2 parents e193b0e + 627e1fd commit 65d96b1

File tree

6 files changed

+16
-14
lines changed

6 files changed

+16
-14
lines changed

Diff for: CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Add to public topic reader api: TopicReaderBatch, wait_message
2+
13
## 3.3.7 ##
24
* Added copy of locals() dicts at internals
35

Diff for: tests/topics/test_topic_reader.py

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ async def test_link_to_client(self, driver, topic_path, topic_consumer):
2222

2323
async def test_read_message(self, driver, topic_with_messages, topic_consumer):
2424
reader = driver.topic_client.reader(topic_with_messages, topic_consumer)
25+
await reader.wait_message()
2526
msg = await reader.receive_message()
2627

2728
assert msg is not None

Diff for: ydb/_topic_reader/datatypes.py

+1-12
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,14 @@ class PartitionSession:
7171
_ack_waiters: Deque["PartitionSession.CommitAckWaiter"] = field(init=False, default_factory=lambda: deque())
7272

7373
_state_changed: asyncio.Event = field(init=False, default_factory=lambda: asyncio.Event(), compare=False)
74-
_loop: Optional[asyncio.AbstractEventLoop] = field(init=False) # may be None in tests
7574

7675
def __post_init__(self):
7776
self._next_message_start_commit_offset = self.committed_offset
7877

79-
try:
80-
self._loop = asyncio.get_running_loop()
81-
except RuntimeError:
82-
self._loop = None
83-
8478
def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter":
8579
self._ensure_not_closed()
8680

87-
waiter = PartitionSession.CommitAckWaiter(end_offset, self._create_future())
81+
waiter = PartitionSession.CommitAckWaiter(end_offset, asyncio.Future())
8882
if end_offset <= self.committed_offset:
8983
waiter._finish_ok()
9084
return waiter
@@ -97,11 +91,6 @@ def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter":
9791

9892
return waiter
9993

100-
def _create_future(self) -> asyncio.Future:
101-
if self._loop:
102-
return self._loop.create_future()
103-
return asyncio.Future()
104-
10594
def ack_notify(self, offset: int):
10695
self._ensure_not_closed()
10796

Diff for: ydb/_topic_reader/topic_reader_asyncio.py

+6
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ def __del__(self):
8888
if not self._closed:
8989
self._loop.create_task(self.close(flush=False), name="close reader")
9090

91+
async def wait_message(self):
92+
"""
93+
Wait at least one message from reader.
94+
"""
95+
await self._reconnector.wait_message()
96+
9197
async def receive_batch(
9298
self,
9399
) -> typing.Union[datatypes.PublicBatch, None]:

Diff for: ydb/_topic_reader/topic_reader_sync.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def async_wait_message(self) -> concurrent.futures.Future:
8484
"""
8585
self._check_closed()
8686

87-
return self._caller.unsafe_call_with_future(self._async_reader._reconnector.wait_message())
87+
return self._caller.unsafe_call_with_future(self._async_reader.wait_message())
8888

8989
def receive_batch(
9090
self,

Diff for: ydb/topic.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"TopicMeteringMode",
1212
"TopicReader",
1313
"TopicReaderAsyncIO",
14+
"TopicReaderBatch",
1415
"TopicReaderMessage",
1516
"TopicReaderSelector",
1617
"TopicReaderSettings",
@@ -33,7 +34,10 @@
3334

3435
from . import driver
3536

36-
from ._topic_reader.datatypes import PublicMessage as TopicReaderMessage
37+
from ._topic_reader.datatypes import (
38+
PublicBatch as TopicReaderBatch,
39+
PublicMessage as TopicReaderMessage,
40+
)
3741

3842
from ._topic_reader.topic_reader import (
3943
PublicReaderSettings as TopicReaderSettings,

0 commit comments

Comments
 (0)