@@ -5,14 +5,13 @@ def writer_example(driver: ydb.Driver, topic: str):
5
5
session_pool = ydb .QuerySessionPool (driver )
6
6
7
7
def callee (tx : ydb .QueryTxContext ):
8
- tx_writer : ydb .TopicTxWriter = driver .topic_client .tx_writer (tx , topic )
9
- # нужно ли внутри ретраить ? нужно разрешать ретраи с дедупликацией
10
- # дефолт - без дедупликации, без ретраев и без producer_id. договорились.
8
+ tx_writer : ydb .TopicTxWriter = driver .topic_client .tx_writer (tx , topic ) # <=======
9
+ # дефолт - без дедупликации, без ретраев и без producer_id.
11
10
12
11
with tx .execute (query = "select 1" ) as result_sets :
13
12
messages = [result_set .rows [0 ] for result_set in result_sets ]
14
13
15
- tx_writer .write (messages )
14
+ tx_writer .write (messages ) # вне зависимости от состояния вышестоящего стрима поведение должно быть одинаковое
16
15
17
16
session_pool .retry_tx_sync (callee )
18
17
@@ -21,11 +20,9 @@ def reader_example(driver: ydb.Driver, reader: ydb.TopicReader):
21
20
session_pool = ydb .QuerySessionPool (driver )
22
21
23
22
def callee (tx : ydb .QueryTxContext ):
24
- batch = reader .receive_batch_with_tx (tx , max_messages = 5 )
23
+ batch = reader .receive_batch_with_tx (tx , max_messages = 5 ) # <=======
25
24
26
25
with tx .execute (query = "INSERT INTO max_values(val) VALUES ($val)" , parameters = {"$val" : max (batch )}) as _ :
27
26
pass
28
27
29
- # коммитим при выходе из лямбды
30
-
31
28
session_pool .retry_tx_sync (callee )
0 commit comments