Skip to content

Commit f3a1b2c

Browse files
committed
topic transactions example
1 parent 22c660c commit f3a1b2c

File tree

1 file changed

+29
-0
lines changed

1 file changed

+29
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import ydb
2+
3+
def writer_example(driver: ydb.Driver, topic: str):
4+
session_pool = ydb.QuerySessionPool(driver)
5+
6+
def callee(tx: ydb.QueryTxContext):
7+
tx_writer: ydb.TopicTxWriter = driver.topic_client.tx_writer(tx, topic)
8+
9+
with tx.execute(query="select 1") as result_sets:
10+
messages = [result_set.rows[0] for result_set in result_sets]
11+
12+
tx_writer.write(messages)
13+
14+
session_pool.retry_tx_sync(callee)
15+
16+
17+
def reader_example(driver: ydb.Driver, reader: ydb.TopicReader):
18+
session_pool = ydb.QuerySessionPool(driver)
19+
20+
def callee(tx: ydb.QueryTxContext):
21+
batch = reader.receive_batch_with_tx(tx)
22+
23+
with tx.execute(
24+
query="INSERT INTO max_values(val) VALUES ($val)",
25+
parameters={"$val": max(batch)}
26+
) as _:
27+
pass
28+
29+
session_pool.retry_tx_sync(callee)

0 commit comments

Comments
 (0)