Skip to content

Commit 6c2d13d

Browse files
authored
Merge pull request #236 topic reader: add flush method
2 parents cc3bfe1 + a857c5a commit 6c2d13d

File tree

6 files changed

+63
-17
lines changed

6 files changed

+63
-17
lines changed

ydb/_topic_reader/datatypes.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter":
9696
return waiter
9797

9898
# fast way
99-
if len(self._ack_waiters) > 0 and self._ack_waiters[-1].end_offset < end_offset:
99+
if self._ack_waiters and self._ack_waiters[-1].end_offset < end_offset:
100100
self._ack_waiters.append(waiter)
101101
else:
102102
bisect.insort(self._ack_waiters, waiter)
@@ -106,25 +106,23 @@ def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter":
106106
def _create_future(self) -> asyncio.Future:
107107
if self._loop:
108108
return self._loop.create_future()
109-
else:
110-
return asyncio.Future()
109+
return asyncio.Future()
111110

112111
def ack_notify(self, offset: int):
113112
self._ensure_not_closed()
114113

115114
self.committed_offset = offset
116115

117-
if len(self._ack_waiters) == 0:
116+
if not self._ack_waiters:
118117
# todo log warning
119118
# must be never receive ack for not sended request
120119
return
121120

122-
while len(self._ack_waiters) > 0:
123-
if self._ack_waiters[0].end_offset <= offset:
124-
waiter = self._ack_waiters.popleft()
125-
waiter._finish_ok()
126-
else:
121+
while self._ack_waiters:
122+
if self._ack_waiters[0].end_offset > offset:
127123
break
124+
waiter = self._ack_waiters.popleft()
125+
waiter._finish_ok()
128126

129127
def close(self):
130128
if self.closed:

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
7575

7676
def __del__(self):
7777
if not self._closed:
78-
self._loop.create_task(self.close(), name="close reader")
78+
self._loop.create_task(self.close(flush=False), name="close reader")
7979

8080
async def receive_batch(
8181
self,
@@ -120,12 +120,12 @@ async def commit_with_ack(
120120
waiter = self._reconnector.commit(batch)
121121
await waiter.future
122122

123-
async def close(self):
123+
async def close(self, flush: bool = True):
124124
if self._closed:
125125
raise TopicReaderClosedError()
126126

127127
self._closed = True
128-
await self._reconnector.close()
128+
await self._reconnector.close(flush)
129129

130130

131131
class ReaderReconnector:
@@ -199,14 +199,20 @@ def commit(
199199
) -> datatypes.PartitionSession.CommitAckWaiter:
200200
return self._stream_reader.commit(batch)
201201

202-
async def close(self):
202+
async def close(self, flush: bool):
203203
if self._stream_reader:
204+
if flush:
205+
await self.flush()
204206
await self._stream_reader.close()
205207
for task in self._background_tasks:
206208
task.cancel()
207209

208210
await asyncio.wait(self._background_tasks)
209211

212+
async def flush(self):
213+
if self._stream_reader:
214+
await self._stream_reader.flush()
215+
210216
def _set_first_error(self, err: issues.Error):
211217
try:
212218
self._first_error.set_result(err)
@@ -641,6 +647,16 @@ def _get_first_error(self) -> Optional[YdbError]:
641647
if self._first_error.done():
642648
return self._first_error.result()
643649

650+
async def flush(self):
651+
if self._closed:
652+
raise RuntimeError("Flush on closed Stream")
653+
654+
futures = []
655+
for session in self._partition_sessions.values():
656+
futures.extend(w.future for w in session._ack_waiters)
657+
658+
await asyncio.gather(*futures)
659+
644660
async def close(self):
645661
if self._closed:
646662
return

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,35 @@ async def test_close_ack_waiters_when_close_stream_reader(
389389
with pytest.raises(topic_reader_asyncio.TopicReaderCommitToExpiredPartition):
390390
waiter.future.result()
391391

392+
async def test_flush(
393+
self, stream, stream_reader_started: ReaderStream, partition_session
394+
):
395+
offset = self.partition_session_committed_offset + 1
396+
waiter = partition_session.add_waiter(offset)
397+
398+
with pytest.raises(WaitConditionError):
399+
await wait_for_fast(stream_reader_started.flush(), timeout=0.1)
400+
401+
stream.from_server.put_nowait(
402+
StreamReadMessage.FromServer(
403+
server_status=ServerStatus(ydb_status_codes_pb2.StatusIds.SUCCESS, []),
404+
server_message=StreamReadMessage.CommitOffsetResponse(
405+
partitions_committed_offsets=[
406+
StreamReadMessage.CommitOffsetResponse.PartitionCommittedOffset(
407+
partition_session_id=partition_session.id,
408+
committed_offset=offset,
409+
),
410+
]
411+
),
412+
)
413+
)
414+
415+
await stream_reader_started.flush()
416+
# don't raises
417+
assert waiter.future.result() is None
418+
419+
await wait_for_fast(stream_reader_started.close())
420+
392421
async def test_commit_ranges_for_received_messages(
393422
self, stream, stream_reader_started: ReaderStream, partition_session
394423
):

ydb/_topic_reader/topic_reader_sync.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async def create_reader():
5050
).result()
5151

5252
def __del__(self):
53-
self.close()
53+
self.close(flush=False)
5454

5555
def __enter__(self):
5656
return self
@@ -152,13 +152,13 @@ def async_commit_with_ack(
152152
self._async_reader.commit_with_ack(mess)
153153
)
154154

155-
def close(self, *, timeout: TimeoutType = None):
155+
def close(self, *, flush: bool = True, timeout: TimeoutType = None):
156156
if self._closed:
157157
return
158158

159159
self._closed = True
160160

161-
self._caller.safe_call_with_result(self._async_reader.close(), timeout)
161+
self._caller.safe_call_with_result(self._async_reader.close(flush), timeout)
162162

163163
def _check_closed(self):
164164
if self._closed:

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def __del__(self):
6565
if self._closed or self._loop.is_closed():
6666
return
6767

68-
self._loop.call_soon(self.close)
68+
self._loop.call_soon(self.close, False)
6969

7070
async def close(self, *, flush: bool = True):
7171
if self._closed:

ydb/_topic_writer/topic_writer_sync.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ def __enter__(self):
5656
def __exit__(self, exc_type, exc_val, exc_tb):
5757
self.close()
5858

59+
def __del__(self):
60+
self.close(flush=False)
61+
5962
def close(self, *, flush: bool = True, timeout: TimeoutType = None):
6063
if self._closed:
6164
return

0 commit comments

Comments
 (0)