Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 0696776

Browse files
committedMar 10, 2025·
transaction identity
1 parent e64d447 commit 0696776

File tree

5 files changed

+35
-4
lines changed

5 files changed

+35
-4
lines changed
 

‎examples/topic/topic_transactions_example.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ def writer_example(driver: ydb.Driver, topic: str):
55
session_pool = ydb.QuerySessionPool(driver)
66

77
def callee(tx: ydb.QueryTxContext):
8-
tx_writer: ydb.TopicTxWriter = driver.topic_client.tx_writer(tx, topic) # <=======
8+
tx_writer: ydb.TopicTxWriter = driver.topic_client.tx_writer(tx, topic) # <=======
99
# дефолт - без дедупликации, без ретраев и без producer_id.
1010

1111
with tx.execute(query="select 1") as result_sets:
1212
messages = [result_set.rows[0] for result_set in result_sets]
1313

14-
tx_writer.write(messages) # вне зависимости от состояния вышестоящего стрима поведение должно быть одинаковое
14+
tx_writer.write(messages) # вне зависимости от состояния вышестоящего стрима поведение должно быть одинаковое
1515

1616
session_pool.retry_tx_sync(callee)
1717

@@ -20,7 +20,7 @@ def reader_example(driver: ydb.Driver, reader: ydb.TopicReader):
2020
session_pool = ydb.QuerySessionPool(driver)
2121

2222
def callee(tx: ydb.QueryTxContext):
23-
batch = reader.receive_batch_with_tx(tx, max_messages=5) # <=======
23+
batch = reader.receive_batch_with_tx(tx, max_messages=5) # <=======
2424

2525
with tx.execute(query="INSERT INTO max_values(val) VALUES ($val)", parameters={"$val": max(batch)}) as _:
2626
pass

‎tests/query/test_query_transaction.py

+12
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,15 @@ def test_execute_two_results(self, tx: QueryTxContext):
9292

9393
assert res == [[1], [2]]
9494
assert counter == 2
95+
96+
def test_tx_identity_before_begin_raises(self, tx: QueryTxContext):
97+
with pytest.raises(RuntimeError):
98+
tx._tx_identity()
99+
100+
def test_tx_identity_after_begin_works(self, tx: QueryTxContext):
101+
tx.begin()
102+
103+
identity = tx._tx_identity()
104+
105+
assert identity.tx_id == tx.tx_id
106+
assert identity.session_id == tx.session_id

‎ydb/_grpc/grpcwrapper/ydb_query.py

+14
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
# Workaround for good IDE and universal for runtime
77
if typing.TYPE_CHECKING:
88
from ..v4.protos import ydb_query_pb2
9+
from ..v4.protos import ydb_topic_pb2
910
else:
1011
from ..common.protos import ydb_query_pb2
12+
from ..common.protos import ydb_topic_pb2
1113

1214
from . import ydb_query_public_types as public_types
1315

@@ -81,6 +83,18 @@ def to_proto(self) -> ydb_query_pb2.TransactionSettings:
8183
return ydb_query_pb2.TransactionSettings(stale_read_only=self.tx_mode.to_proto())
8284

8385

86+
@dataclass
87+
class TransactionIdentity(IToProto):
88+
tx_id: str
89+
session_id: str
90+
91+
def to_proto(self) -> ydb_topic_pb2.TransactionIdentity:
92+
return ydb_topic_pb2.TransactionIdentity(
93+
id=self.tx_id,
94+
session=self.session_id,
95+
)
96+
97+
8498
@dataclass
8599
class BeginTransactionRequest(IToProto):
86100
session_id: str

‎ydb/query/transaction.py

+5
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,11 @@ def tx_id(self) -> Optional[str]:
215215
"""
216216
return self._tx_state.tx_id
217217

218+
def _tx_identity(self) -> _ydb_query.TransactionIdentity:
219+
if not self.tx_id:
220+
raise RuntimeError("Unable to get tx identity without started tx.")
221+
return _ydb_query.TransactionIdentity(self.tx_id, self.session_id)
222+
218223
def _begin_call(self, settings: Optional[BaseRequestSettings]) -> "BaseQueryTxContext":
219224
self._tx_state._check_invalid_transition(QueryTxStateEnum.BEGINED)
220225

‎ydb/topic.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ def tx_writer(
294294
# If max_worker in the executor is 1 - then encoders will be called from the thread without parallel.
295295
encoder_executor: Optional[concurrent.futures.Executor] = None,
296296
) -> TopicTxWriterAsyncIO:
297-
297+
pass
298298

299299
def close(self):
300300
if self._closed:

0 commit comments

Comments
 (0)
Please sign in to comment.