Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 2d2b481

Browse files
committedFeb 20, 2025·
topic tx basic test cases
1 parent 41b641c commit 2d2b481

File tree

1 file changed

+67
-0
lines changed

1 file changed

+67
-0
lines changed
 
+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import asyncio
2+
from asyncio import wait_for
3+
import pytest
4+
import ydb
5+
6+
7+
@pytest.mark.skip("Not implemented yet.")
8+
@pytest.mark.asyncio
9+
class TestTopicTransactionalReader:
10+
async def test_commit(self, driver: ydb.aio.Driver, topic_with_messages, topic_consumer):
11+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
12+
async with ydb.aio.QuerySessionPool(driver) as pool:
13+
14+
async def callee(tx: ydb.aio.QueryTxContext):
15+
batch = await wait_for(reader.receive_batch_with_tx(tx, max_messages=1), 1)
16+
assert len(batch) == 1
17+
assert batch[0].data.decode() == "123"
18+
19+
await pool.retry_tx_async(callee)
20+
21+
msg = await wait_for(reader.receive_message(), 1)
22+
assert msg.data.decode() == "456"
23+
24+
async def test_rollback(self, driver: ydb.aio.Driver, topic_with_messages, topic_consumer):
25+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
26+
async with ydb.aio.QuerySessionPool(driver) as pool:
27+
28+
async def callee(tx: ydb.aio.QueryTxContext):
29+
batch = await wait_for(reader.receive_batch_with_tx(tx, max_messages=1), 1)
30+
assert len(batch) == 1
31+
assert batch[0].data.decode() == "123"
32+
33+
await tx.rollback()
34+
35+
await pool.retry_tx_async(callee)
36+
37+
msg = await wait_for(reader.receive_message(), 1)
38+
assert msg.data.decode() == "123"
39+
40+
41+
@pytest.mark.skip("Not implemented yet.")
42+
class TestTopicTransactionalWriter:
43+
async def test_commit(self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO):
44+
async with ydb.aio.QuerySessionPool(driver) as pool:
45+
46+
async def callee(tx: ydb.aio.QueryTxContext):
47+
tx_writer = driver.topic_client.tx_writer(tx, topic_path)
48+
tx_writer.write(ydb.TopicWriterMessage(data="123".encode()))
49+
50+
await pool.retry_tx_async(callee)
51+
52+
msg = await wait_for(topic_reader.receive_message(), 0.1)
53+
assert msg.data.decode() == "123"
54+
55+
async def test_rollback(self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO):
56+
async with ydb.aio.QuerySessionPool(driver) as pool:
57+
58+
async def callee(tx: ydb.aio.QueryTxContext):
59+
tx_writer = driver.topic_client.tx_writer(tx, topic_path)
60+
tx_writer.write(ydb.TopicWriterMessage(data="123".encode()))
61+
62+
await tx.rollback()
63+
64+
await pool.retry_tx_async(callee)
65+
66+
with pytest.raises(asyncio.TimeoutError):
67+
await wait_for(topic_reader.receive_message(), 0.1)

0 commit comments

Comments
 (0)
Please sign in to comment.