Skip to content

Commit 3df0281

Browse files
committed
topic transactions example
1 parent 22c660c commit 3df0281

File tree

1 file changed

+27
-0
lines changed

1 file changed

+27
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import ydb
2+
3+
4+
def writer_example(driver: ydb.Driver, topic: str):
5+
session_pool = ydb.QuerySessionPool(driver)
6+
7+
def callee(tx: ydb.QueryTxContext):
8+
tx_writer: ydb.TopicTxWriter = driver.topic_client.tx_writer(tx, topic)
9+
10+
with tx.execute(query="select 1") as result_sets:
11+
messages = [result_set.rows[0] for result_set in result_sets]
12+
13+
tx_writer.write(messages)
14+
15+
session_pool.retry_tx_sync(callee)
16+
17+
18+
def reader_example(driver: ydb.Driver, reader: ydb.TopicReader):
19+
session_pool = ydb.QuerySessionPool(driver)
20+
21+
def callee(tx: ydb.QueryTxContext):
22+
batch = reader.receive_batch_with_tx(tx)
23+
24+
with tx.execute(query="INSERT INTO max_values(val) VALUES ($val)", parameters={"$val": max(batch)}) as _:
25+
pass
26+
27+
session_pool.retry_tx_sync(callee)

0 commit comments

Comments
 (0)