Skip to content

Commit 90c96f6

Browse files
authored
Merge pull request #248 sync-from-v3
2 parents 66f6485 + 6c2d13d commit 90c96f6

File tree

7 files changed

+103
-42
lines changed

7 files changed

+103
-42
lines changed

ydb/_topic_reader/datatypes.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter":
9696
return waiter
9797

9898
# fast way
99-
if len(self._ack_waiters) > 0 and self._ack_waiters[-1].end_offset < end_offset:
99+
if self._ack_waiters and self._ack_waiters[-1].end_offset < end_offset:
100100
self._ack_waiters.append(waiter)
101101
else:
102102
bisect.insort(self._ack_waiters, waiter)
@@ -106,25 +106,23 @@ def add_waiter(self, end_offset: int) -> "PartitionSession.CommitAckWaiter":
106106
def _create_future(self) -> asyncio.Future:
107107
if self._loop:
108108
return self._loop.create_future()
109-
else:
110-
return asyncio.Future()
109+
return asyncio.Future()
111110

112111
def ack_notify(self, offset: int):
113112
self._ensure_not_closed()
114113

115114
self.committed_offset = offset
116115

117-
if len(self._ack_waiters) == 0:
116+
if not self._ack_waiters:
118117
# todo log warning
119118
# must be never receive ack for not sended request
120119
return
121120

122-
while len(self._ack_waiters) > 0:
123-
if self._ack_waiters[0].end_offset <= offset:
124-
waiter = self._ack_waiters.popleft()
125-
waiter._finish_ok()
126-
else:
121+
while self._ack_waiters:
122+
if self._ack_waiters[0].end_offset > offset:
127123
break
124+
waiter = self._ack_waiters.popleft()
125+
waiter._finish_ok()
128126

129127
def close(self):
130128
if self.closed:

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
7575

7676
def __del__(self):
7777
if not self._closed:
78-
self._loop.create_task(self.close(), name="close reader")
78+
self._loop.create_task(self.close(flush=False), name="close reader")
7979

8080
async def receive_batch(
8181
self,
@@ -120,12 +120,12 @@ async def commit_with_ack(
120120
waiter = self._reconnector.commit(batch)
121121
await waiter.future
122122

123-
async def close(self):
123+
async def close(self, flush: bool = True):
124124
if self._closed:
125125
raise TopicReaderClosedError()
126126

127127
self._closed = True
128-
await self._reconnector.close()
128+
await self._reconnector.close(flush)
129129

130130

131131
class ReaderReconnector:
@@ -199,14 +199,20 @@ def commit(
199199
) -> datatypes.PartitionSession.CommitAckWaiter:
200200
return self._stream_reader.commit(batch)
201201

202-
async def close(self):
202+
async def close(self, flush: bool):
203203
if self._stream_reader:
204+
if flush:
205+
await self.flush()
204206
await self._stream_reader.close()
205207
for task in self._background_tasks:
206208
task.cancel()
207209

208210
await asyncio.wait(self._background_tasks)
209211

212+
async def flush(self):
213+
if self._stream_reader:
214+
await self._stream_reader.flush()
215+
210216
def _set_first_error(self, err: issues.Error):
211217
try:
212218
self._first_error.set_result(err)
@@ -641,6 +647,16 @@ def _get_first_error(self) -> Optional[YdbError]:
641647
if self._first_error.done():
642648
return self._first_error.result()
643649

650+
async def flush(self):
651+
if self._closed:
652+
raise RuntimeError("Flush on closed Stream")
653+
654+
futures = []
655+
for session in self._partition_sessions.values():
656+
futures.extend(w.future for w in session._ack_waiters)
657+
658+
await asyncio.gather(*futures)
659+
644660
async def close(self):
645661
if self._closed:
646662
return

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,35 @@ async def test_close_ack_waiters_when_close_stream_reader(
389389
with pytest.raises(topic_reader_asyncio.TopicReaderCommitToExpiredPartition):
390390
waiter.future.result()
391391

392+
async def test_flush(
393+
self, stream, stream_reader_started: ReaderStream, partition_session
394+
):
395+
offset = self.partition_session_committed_offset + 1
396+
waiter = partition_session.add_waiter(offset)
397+
398+
with pytest.raises(WaitConditionError):
399+
await wait_for_fast(stream_reader_started.flush(), timeout=0.1)
400+
401+
stream.from_server.put_nowait(
402+
StreamReadMessage.FromServer(
403+
server_status=ServerStatus(ydb_status_codes_pb2.StatusIds.SUCCESS, []),
404+
server_message=StreamReadMessage.CommitOffsetResponse(
405+
partitions_committed_offsets=[
406+
StreamReadMessage.CommitOffsetResponse.PartitionCommittedOffset(
407+
partition_session_id=partition_session.id,
408+
committed_offset=offset,
409+
),
410+
]
411+
),
412+
)
413+
)
414+
415+
await stream_reader_started.flush()
416+
# don't raises
417+
assert waiter.future.result() is None
418+
419+
await wait_for_fast(stream_reader_started.close())
420+
392421
async def test_commit_ranges_for_received_messages(
393422
self, stream, stream_reader_started: ReaderStream, partition_session
394423
):

ydb/_topic_reader/topic_reader_sync.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async def create_reader():
5050
).result()
5151

5252
def __del__(self):
53-
self.close()
53+
self.close(flush=False)
5454

5555
def __enter__(self):
5656
return self
@@ -152,13 +152,13 @@ def async_commit_with_ack(
152152
self._async_reader.commit_with_ack(mess)
153153
)
154154

155-
def close(self, *, timeout: TimeoutType = None):
155+
def close(self, *, flush: bool = True, timeout: TimeoutType = None):
156156
if self._closed:
157157
return
158158

159159
self._closed = True
160160

161-
self._caller.safe_call_with_result(self._async_reader.close(), timeout)
161+
self._caller.safe_call_with_result(self._async_reader.close(flush), timeout)
162162

163163
def _check_closed(self):
164164
if self._closed:

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from collections import deque
77
from typing import Deque, AsyncIterator, Union, List, Optional, Dict, Callable
88

9+
import logging
10+
911
import ydb
1012
from .topic_writer import (
1113
PublicWriterSettings,
@@ -38,6 +40,8 @@
3840
GrpcWrapperAsyncIO,
3941
)
4042

43+
logger = logging.getLogger(__name__)
44+
4145

4246
class WriterAsyncIO:
4347
_loop: asyncio.AbstractEventLoop
@@ -61,7 +65,7 @@ def __del__(self):
6165
if self._closed or self._loop.is_closed():
6266
return
6367

64-
self._loop.call_soon(self.close)
68+
self._loop.call_soon(self.close, False)
6569

6670
async def close(self, *, flush: bool = True):
6771
if self._closed:
@@ -154,7 +158,6 @@ class WriterAsyncIOReconnector:
154158
_credentials: Union[ydb.credentials.Credentials, None]
155159
_driver: ydb.aio.Driver
156160
_init_message: StreamWriteMessage.InitRequest
157-
_init_info: asyncio.Future
158161
_stream_connected: asyncio.Event
159162
_settings: WriterSettings
160163
_codec: PublicCodec
@@ -164,25 +167,30 @@ class WriterAsyncIOReconnector:
164167
_codec_selector_last_codec: Optional[PublicCodec]
165168
_codec_selector_check_batches_interval: int
166169

167-
_last_known_seq_no: int
168170
if typing.TYPE_CHECKING:
169171
_messages_for_encode: asyncio.Queue[List[InternalMessage]]
170172
else:
171173
_messages_for_encode: asyncio.Queue
172174
_messages: Deque[InternalMessage]
173175
_messages_future: Deque[asyncio.Future]
174176
_new_messages: asyncio.Queue
175-
_stop_reason: asyncio.Future
176177
_background_tasks: List[asyncio.Task]
177178

179+
_state_changed: asyncio.Event
180+
if typing.TYPE_CHECKING:
181+
_stop_reason: asyncio.Future[BaseException]
182+
else:
183+
_stop_reason: asyncio.Future
184+
_init_info: Optional[PublicWriterInitInfo]
185+
178186
def __init__(self, driver: SupportedDriverType, settings: WriterSettings):
179187
self._closed = False
180188
self._loop = asyncio.get_running_loop()
181189
self._driver = driver
182190
self._credentials = driver._credentials
183191
self._init_message = settings.create_init_request()
184192
self._new_messages = asyncio.Queue()
185-
self._init_info = self._loop.create_future()
193+
self._init_info = None
186194
self._stream_connected = asyncio.Event()
187195
self._settings = settings
188196

@@ -219,14 +227,17 @@ def __init__(self, driver: SupportedDriverType, settings: WriterSettings):
219227
asyncio.create_task(self._encode_loop(), name="encode_loop"),
220228
]
221229

230+
self._state_changed = asyncio.Event()
231+
222232
async def close(self, flush: bool):
223233
if self._closed:
224234
return
235+
self._closed = True
236+
logger.debug("Close writer reconnector")
225237

226238
if flush:
227239
await self.flush()
228240

229-
self._closed = True
230241
self._stop(TopicWriterStopped())
231242

232243
for task in self._background_tasks:
@@ -240,19 +251,20 @@ async def close(self, flush: bool):
240251
pass
241252

242253
async def wait_init(self) -> PublicWriterInitInfo:
243-
done, _ = await asyncio.wait(
244-
[self._init_info, self._stop_reason], return_when=asyncio.FIRST_COMPLETED
245-
)
246-
res = done.pop() # type: asyncio.Future
247-
res_val = res.result()
254+
while True:
255+
if self._stop_reason.done():
256+
raise self._stop_reason.exception()
248257

249-
if isinstance(res_val, BaseException):
250-
raise res_val
258+
if self._init_info:
259+
return self._init_info
251260

252-
return res_val
261+
await self._state_changed.wait()
253262

254-
async def wait_stop(self) -> Exception:
255-
return await self._stop_reason
263+
async def wait_stop(self) -> BaseException:
264+
try:
265+
await self._stop_reason
266+
except BaseException as stop_reason:
267+
return stop_reason
256268

257269
async def write_with_ack_future(
258270
self, messages: List[PublicMessage]
@@ -343,13 +355,14 @@ async def _connection_loop(self):
343355
self._settings.update_token_interval,
344356
)
345357
try:
346-
self._last_known_seq_no = stream_writer.last_seqno
347-
self._init_info.set_result(
348-
PublicWriterInitInfo(
358+
if self._init_info is None:
359+
self._last_known_seq_no = stream_writer.last_seqno
360+
self._init_info = PublicWriterInitInfo(
349361
last_seqno=stream_writer.last_seqno,
350362
supported_codecs=stream_writer.supported_codecs,
351363
)
352-
)
364+
self._state_changed.set()
365+
353366
except asyncio.InvalidStateError:
354367
pass
355368

@@ -369,9 +382,6 @@ async def _connection_loop(self):
369382
await stream_writer.close()
370383
done.pop().result()
371384
except issues.Error as err:
372-
# todo log error
373-
print(err)
374-
375385
err_info = check_retriable_error(err, retry_settings, attempt)
376386
if not err_info.is_retriable:
377387
self._stop(err)
@@ -550,8 +560,13 @@ def _stop(self, reason: Exception):
550560

551561
self._stop_reason.set_result(reason)
552562

563+
for f in self._messages_future:
564+
f.set_exception(reason)
565+
566+
self._state_changed.set()
567+
logger.info("Stop topic writer: %s" % reason)
568+
553569
async def flush(self):
554-
self._check_stop()
555570
if not self._messages_future:
556571
return
557572

ydb/_topic_writer/topic_writer_sync.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ def __enter__(self):
5656
def __exit__(self, exc_type, exc_val, exc_tb):
5757
self.close()
5858

59+
def __del__(self):
60+
self.close(flush=False)
61+
5962
def close(self, *, flush: bool = True, timeout: TimeoutType = None):
6063
if self._closed:
6164
return

ydb/topic.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def close(self):
208208
return
209209

210210
self._closed = True
211-
self._executor.shutdown(wait=False, cancel_futures=True)
211+
self._executor.shutdown(wait=False)
212212

213213
def _check_closed(self):
214214
if not self._closed:
@@ -366,7 +366,7 @@ def close(self):
366366
return
367367

368368
self._closed = True
369-
self._executor.shutdown(wait=False, cancel_futures=True)
369+
self._executor.shutdown(wait=False)
370370

371371
def _check_closed(self):
372372
if not self._closed:

0 commit comments

Comments
 (0)