Skip to content

Commit 0657186

Browse files
committed
test corner cases
1 parent 454e4e3 commit 0657186

File tree

3 files changed

+28
-7
lines changed

3 files changed

+28
-7
lines changed

tests/topics/test_topic_transactions.py

+25-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async def callee(tx: ydb.aio.QueryTxContext):
6565
with pytest.raises(asyncio.TimeoutError):
6666
await wait_for(topic_reader.receive_message(), 0.1)
6767

68-
async def test_no_msg_writter_in_error_case(
68+
async def test_no_msg_written_in_error_case(
6969
self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO
7070
):
7171
async with ydb.aio.QuerySessionPool(driver) as pool:
@@ -82,6 +82,29 @@ async def callee(tx: ydb.aio.QueryTxContext):
8282
with pytest.raises(asyncio.TimeoutError):
8383
await wait_for(topic_reader.receive_message(), 0.1)
8484

85+
async def test_msg_written_exactly_once_with_retries(
86+
self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO
87+
):
88+
error_raised = False
89+
async with ydb.aio.QuerySessionPool(driver) as pool:
90+
91+
async def callee(tx: ydb.aio.QueryTxContext):
92+
nonlocal error_raised
93+
tx_writer = driver.topic_client.tx_writer(tx, topic_path)
94+
await tx_writer.write(ydb.TopicWriterMessage(data="123".encode()))
95+
96+
if not error_raised:
97+
error_raised = True
98+
raise ydb.issues.Unavailable("some retriable error")
99+
100+
await pool.retry_tx_async(callee)
101+
102+
msg = await wait_for(topic_reader.receive_message(), 0.1)
103+
assert msg.data.decode() == "123"
104+
105+
with pytest.raises(asyncio.TimeoutError):
106+
await wait_for(topic_reader.receive_message(), 0.1)
107+
85108

86109
class TestTopicTransactionalWriterSync:
87110
def test_commit(self, driver_sync: ydb.Driver, topic_path, topic_reader_sync: ydb.TopicReader):
@@ -110,7 +133,7 @@ def callee(tx: ydb.QueryTxContext):
110133
with pytest.raises(TimeoutError):
111134
topic_reader_sync.receive_message(timeout=0.1)
112135

113-
def test_no_msg_writter_in_error_case(
136+
def test_no_msg_written_in_error_case(
114137
self, driver_sync: ydb.Driver, topic_path, topic_reader_sync: ydb.TopicReaderAsyncIO
115138
):
116139
with ydb.QuerySessionPool(driver_sync) as pool:

ydb/_topic_writer/topic_writer_asyncio.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ async def _on_before_commit(self):
191191
await self.close()
192192

193193
async def _on_before_rollback(self):
194-
await self.close()
194+
await self.close(flush=False)
195195

196196

197197
class WriterAsyncIOReconnector:
@@ -423,7 +423,7 @@ async def _connection_loop(self):
423423
done.pop().result() # need for raise exception - reason of stop task
424424
except issues.Error as err:
425425
err_info = check_retriable_error(err, retry_settings, attempt)
426-
if not err_info.is_retriable:
426+
if not err_info.is_retriable or self._tx is not None: # no retries in tx writer
427427
self._stop(err)
428428
return
429429

@@ -586,7 +586,6 @@ async def _send_loop(self, writer: "WriterAsyncIOStream"):
586586

587587
while True:
588588
m = await self._new_messages.get() # type: InternalMessage
589-
print("NEW MESSAGE")
590589
if m.seq_no > last_seq_no:
591590
writer.write([m])
592591
except asyncio.CancelledError:
@@ -618,7 +617,6 @@ async def flush(self):
618617

619618
# wait last message
620619
await asyncio.wait(self._messages_future)
621-
print("ALL MESSAGES WERE SENT TO SERVER")
622620

623621

624622
class WriterAsyncIOStream:

ydb/_topic_writer/topic_writer_sync.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,4 @@ def _on_before_commit(self):
164164
self.close()
165165

166166
def _on_before_rollback(self):
167-
self.close()
167+
self.close(flush=False)

0 commit comments

Comments
 (0)