Skip to content

Commit 82ec438

Browse files
committed
add executable example
1 parent 78f8149 commit 82ec438

File tree

3 files changed

+95
-18
lines changed

3 files changed

+95
-18
lines changed
+78-16
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,90 @@
1+
import asyncio
2+
import argparse
3+
import logging
14
import ydb
25

36

4-
def writer_example(driver: ydb.Driver, topic: str):
5-
session_pool = ydb.QuerySessionPool(driver)
7+
async def connect(endpoint: str, database: str) -> ydb.aio.Driver:
8+
config = ydb.DriverConfig(endpoint=endpoint, database=database)
9+
config.credentials = ydb.credentials_from_env_variables()
10+
driver = ydb.aio.Driver(config)
11+
await driver.wait(5, fail_fast=True)
12+
return driver
613

7-
def callee(tx: ydb.QueryTxContext):
8-
tx_writer: ydb.TopicTxWriter = driver.topic_client.tx_writer(tx, topic) # <=======
9-
# дефолт - без дедупликации, без ретраев и без producer_id.
1014

11-
with tx.execute(query="select 1") as result_sets:
12-
messages = [result_set.rows[0] for result_set in result_sets]
15+
async def create_topic(driver: ydb.aio.Driver, topic: str, consumer: str):
16+
try:
17+
await driver.topic_client.drop_topic(topic)
18+
except ydb.SchemeError:
19+
pass
1320

14-
tx_writer.write(messages) # вне зависимости от состояния вышестоящего стрима поведение должно быть одинаковое
21+
await driver.topic_client.create_topic(topic, consumers=[consumer])
1522

16-
session_pool.retry_tx_sync(callee)
1723

24+
async def write_with_tx_example(driver: ydb.aio.Driver, topic: str, message_count: int = 10):
25+
async with ydb.aio.QuerySessionPool(driver) as session_pool:
1826

19-
def reader_example(driver: ydb.Driver, reader: ydb.TopicReader):
20-
session_pool = ydb.QuerySessionPool(driver)
27+
async def callee(tx: ydb.aio.QueryTxContext):
28+
print(f"TX ID: {tx.tx_id}")
29+
print(f"TX STATE: {tx._tx_state._state.value}")
30+
tx_writer: ydb.TopicTxWriterAsyncIO = driver.topic_client.tx_writer(tx, topic)
31+
print(f"TX ID: {tx.tx_id}")
32+
print(f"TX STATE: {tx._tx_state._state.value}")
33+
for i in range(message_count):
34+
result_stream = await tx.execute(query=f"select {i} as res")
35+
messages = [result_set.rows[0]["res"] async for result_set in result_stream]
2136

22-
def callee(tx: ydb.QueryTxContext):
23-
batch = reader.receive_batch_with_tx(tx, max_messages=5) # <=======
37+
await tx_writer.write([ydb.TopicWriterMessage(data=str(message)) for message in messages])
2438

25-
with tx.execute(query="INSERT INTO max_values(val) VALUES ($val)", parameters={"$val": max(batch)}) as _:
26-
pass
39+
print(f"Messages {messages} were written with tx.")
2740

28-
session_pool.retry_tx_sync(callee)
41+
await session_pool.retry_tx_async(callee)
42+
43+
44+
async def read_with_tx_example(driver: ydb.aio.Driver, topic: str, consumer: str, message_count: int = 10):
45+
async with driver.topic_client.reader(topic, consumer) as reader:
46+
async with ydb.aio.QuerySessionPool(driver) as session_pool:
47+
for _ in range(message_count):
48+
49+
async def callee(tx: ydb.aio.QueryTxContext):
50+
batch = await reader.receive_batch_with_tx(tx, max_messages=1)
51+
print(f"Messages {batch.messages[0].data} were read with tx.")
52+
53+
await session_pool.retry_tx_async(callee)
54+
55+
56+
async def main():
57+
parser = argparse.ArgumentParser(
58+
formatter_class=argparse.RawDescriptionHelpFormatter,
59+
description="""YDB topic basic example.\n""",
60+
)
61+
parser.add_argument("-d", "--database", default="/local", help="Name of the database to use")
62+
parser.add_argument("-e", "--endpoint", default="grpc://localhost:2136", help="Endpoint url to use")
63+
parser.add_argument("-p", "--path", default="test-topic", help="Topic name")
64+
parser.add_argument("-c", "--consumer", default="consumer", help="Consumer name")
65+
parser.add_argument("-v", "--verbose", default=False, action="store_true")
66+
parser.add_argument(
67+
"-s",
68+
"--skip-drop-and-create-topic",
69+
default=False,
70+
action="store_true",
71+
help="Use existed topic, skip remove it and re-create",
72+
)
73+
74+
args = parser.parse_args()
75+
76+
if args.verbose:
77+
logger = logging.getLogger("topicexample")
78+
logger.setLevel(logging.DEBUG)
79+
logger.addHandler(logging.StreamHandler())
80+
81+
driver = await connect(args.endpoint, args.database)
82+
if not args.skip_drop_and_create_topic:
83+
await create_topic(driver, args.path, args.consumer)
84+
85+
await write_with_tx_example(driver, args.path)
86+
await read_with_tx_example(driver, args.path, args.consumer)
87+
88+
89+
if __name__ == "__main__":
90+
asyncio.run(main())

ydb/_topic_writer/topic_writer_asyncio.py

+13
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,22 @@ def __init__(
191191
tx._add_callback(TxEvent.BEFORE_COMMIT, self._on_before_commit, self._loop)
192192
tx._add_callback(TxEvent.BEFORE_ROLLBACK, self._on_before_rollback, self._loop)
193193

194+
async def write(
195+
self,
196+
messages: Union[Message, List[Message]],
197+
):
198+
"""
199+
send one or number of messages to server.
200+
it put message to internal buffer
201+
202+
For wait with timeout use asyncio.wait_for.
203+
"""
204+
await self.write_with_ack(messages)
205+
194206
async def _on_before_commit(self, tx: "BaseQueryTxContext"):
195207
if self._is_implicit:
196208
return
209+
await self.flush()
197210
await self.close()
198211

199212
async def _on_before_rollback(self, tx: "BaseQueryTxContext"):

ydb/topic.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
"TopicWriteResult",
2626
"TopicWriter",
2727
"TopicWriterAsyncIO",
28+
"TopicTxWriter",
29+
"TopicTxWriterAsyncIO",
2830
"TopicWriterInitInfo",
2931
"TopicWriterMessage",
3032
"TopicWriterSettings",
@@ -68,7 +70,7 @@
6870
from ydb._topic_writer.topic_writer_asyncio import TxWriterAsyncIO as TopicTxWriterAsyncIO
6971
from ydb._topic_writer.topic_writer_asyncio import WriterAsyncIO as TopicWriterAsyncIO
7072
from ._topic_writer.topic_writer_sync import WriterSync as TopicWriter
71-
from ._topic_writer.topic_writer_sync import TxWriterSync as TxTopicWriter
73+
from ._topic_writer.topic_writer_sync import TxWriterSync as TopicTxWriter
7274

7375
from ._topic_common.common import (
7476
wrap_operation as _wrap_operation,
@@ -546,7 +548,7 @@ def tx_writer(
546548
if not settings.encoder_executor:
547549
settings.encoder_executor = self._executor
548550

549-
return TxTopicWriter(tx, self._driver, settings, _parent=self)
551+
return TopicTxWriter(tx, self._driver, settings, _parent=self)
550552

551553
def close(self):
552554
if self._closed:

0 commit comments

Comments
 (0)