Skip to content

Commit 41b641c

Browse files
committed
topic transactions example
1 parent 22c660c commit 41b641c

File tree

1 file changed

+31
-0
lines changed

1 file changed

+31
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import ydb
2+
3+
4+
def writer_example(driver: ydb.Driver, topic: str):
5+
session_pool = ydb.QuerySessionPool(driver)
6+
7+
def callee(tx: ydb.QueryTxContext):
8+
tx_writer: ydb.TopicTxWriter = driver.topic_client.tx_writer(tx, topic)
9+
# нужно ли внутри ретраить ? нужно разрешать ретраи с дедупликацией
10+
# дефолт - без дедупликации, без ретраев и без producer_id. договорились.
11+
12+
with tx.execute(query="select 1") as result_sets:
13+
messages = [result_set.rows[0] for result_set in result_sets]
14+
15+
tx_writer.write(messages)
16+
17+
session_pool.retry_tx_sync(callee)
18+
19+
20+
def reader_example(driver: ydb.Driver, reader: ydb.TopicReader):
21+
session_pool = ydb.QuerySessionPool(driver)
22+
23+
def callee(tx: ydb.QueryTxContext):
24+
batch = reader.receive_batch_with_tx(tx, max_messages=5)
25+
26+
with tx.execute(query="INSERT INTO max_values(val) VALUES ($val)", parameters={"$val": max(batch)}) as _:
27+
pass
28+
29+
# коммитим при выходе из лямбды
30+
31+
session_pool.retry_tx_sync(callee)

0 commit comments

Comments
 (0)