Skip to content

Commit dbf4235

Browse files
authored
Merge pull request #673 from ydb-platform/commit_offset_new_field
CommitOffset with read session id
2 parents 277e6e0 + 58b0a70 commit dbf4235

35 files changed

+3657
-1490
lines changed

tests/topics/test_topic_reader.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,28 @@ async def test_commit_offset_works(self, driver, topic_with_messages, topic_cons
7474
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
7575
)
7676

77+
async def test_commit_offset_with_session_id_works(self, driver, topic_with_messages, topic_consumer):
78+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
79+
msg1 = await reader.receive_message()
80+
assert msg1.seqno == 1
81+
msg2 = await reader.receive_message()
82+
assert msg2.seqno == 2
83+
84+
await driver.topic_client.commit_offset(
85+
topic_with_messages,
86+
topic_consumer,
87+
msg1.partition_id,
88+
msg1.offset + 1,
89+
reader.read_session_id,
90+
)
91+
92+
msg3 = await reader.receive_message()
93+
assert msg3.seqno == 3
94+
95+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
96+
msg2 = await reader.receive_message()
97+
assert msg2.seqno == 2
98+
7799
async def test_reader_reconnect_after_commit_offset(self, driver, topic_with_messages, topic_consumer):
78100
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
79101
for out in ["123", "456", "789", "0"]:
@@ -213,6 +235,28 @@ def test_commit_offset_works(self, driver_sync, topic_with_messages, topic_consu
213235
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
214236
)
215237

238+
def test_commit_offset_with_session_id_works(self, driver_sync, topic_with_messages, topic_consumer):
239+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
240+
msg1 = reader.receive_message()
241+
assert msg1.seqno == 1
242+
msg2 = reader.receive_message()
243+
assert msg2.seqno == 2
244+
245+
driver_sync.topic_client.commit_offset(
246+
topic_with_messages,
247+
topic_consumer,
248+
msg1.partition_id,
249+
msg1.offset + 1,
250+
reader.read_session_id,
251+
)
252+
253+
msg3 = reader.receive_message()
254+
assert msg3.seqno == 3
255+
256+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
257+
msg2 = reader.receive_message()
258+
assert msg2.seqno == 2
259+
216260
def test_reader_reconnect_after_commit_offset(self, driver_sync, topic_with_messages, topic_consumer):
217261
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
218262
for out in ["123", "456", "789", "0"]:

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,15 @@ class CommitOffsetRequest(IToProto):
143143
consumer: str
144144
partition_id: int
145145
offset: int
146+
read_session_id: Optional[str]
146147

147148
def to_proto(self) -> ydb_topic_pb2.CommitOffsetRequest:
148149
return ydb_topic_pb2.CommitOffsetRequest(
149150
path=self.path,
150151
consumer=self.consumer,
151152
partition_id=self.partition_id,
152153
offset=self.offset,
154+
read_session_id=self.read_session_id,
153155
)
154156

155157

ydb/_grpc/v3/draft/protos/ydb_federated_query_pb2.py

Lines changed: 1206 additions & 373 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ydb/_grpc/v3/protos/ydb_export_pb2.py

Lines changed: 158 additions & 40 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ydb/_grpc/v3/protos/ydb_import_pb2.py

Lines changed: 453 additions & 38 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ydb/_grpc/v3/protos/ydb_topic_pb2.py

Lines changed: 141 additions & 111 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ydb/_grpc/v3/ydb_import_v1_pb2.py

Lines changed: 13 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ydb/_grpc/v3/ydb_import_v1_pb2_grpc.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ def __init__(self, channel):
1919
request_serializer=protos_dot_ydb__import__pb2.ImportFromS3Request.SerializeToString,
2020
response_deserializer=protos_dot_ydb__import__pb2.ImportFromS3Response.FromString,
2121
)
22+
self.ListObjectsInS3Export = channel.unary_unary(
23+
'/Ydb.Import.V1.ImportService/ListObjectsInS3Export',
24+
request_serializer=protos_dot_ydb__import__pb2.ListObjectsInS3ExportRequest.SerializeToString,
25+
response_deserializer=protos_dot_ydb__import__pb2.ListObjectsInS3ExportResponse.FromString,
26+
)
2227
self.ImportData = channel.unary_unary(
2328
'/Ydb.Import.V1.ImportService/ImportData',
2429
request_serializer=protos_dot_ydb__import__pb2.ImportDataRequest.SerializeToString,
@@ -37,6 +42,13 @@ def ImportFromS3(self, request, context):
3742
context.set_details('Method not implemented!')
3843
raise NotImplementedError('Method not implemented!')
3944

45+
def ListObjectsInS3Export(self, request, context):
46+
"""List objects from existing export stored in S3 bucket
47+
"""
48+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
49+
context.set_details('Method not implemented!')
50+
raise NotImplementedError('Method not implemented!')
51+
4052
def ImportData(self, request, context):
4153
"""Writes data to a table.
4254
Method accepts serialized data in the selected format and writes it non-transactionally.
@@ -53,6 +65,11 @@ def add_ImportServiceServicer_to_server(servicer, server):
5365
request_deserializer=protos_dot_ydb__import__pb2.ImportFromS3Request.FromString,
5466
response_serializer=protos_dot_ydb__import__pb2.ImportFromS3Response.SerializeToString,
5567
),
68+
'ListObjectsInS3Export': grpc.unary_unary_rpc_method_handler(
69+
servicer.ListObjectsInS3Export,
70+
request_deserializer=protos_dot_ydb__import__pb2.ListObjectsInS3ExportRequest.FromString,
71+
response_serializer=protos_dot_ydb__import__pb2.ListObjectsInS3ExportResponse.SerializeToString,
72+
),
5673
'ImportData': grpc.unary_unary_rpc_method_handler(
5774
servicer.ImportData,
5875
request_deserializer=protos_dot_ydb__import__pb2.ImportDataRequest.FromString,
@@ -85,6 +102,23 @@ def ImportFromS3(request,
85102
options, channel_credentials,
86103
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
87104

105+
@staticmethod
106+
def ListObjectsInS3Export(request,
107+
target,
108+
options=(),
109+
channel_credentials=None,
110+
call_credentials=None,
111+
insecure=False,
112+
compression=None,
113+
wait_for_ready=None,
114+
timeout=None,
115+
metadata=None):
116+
return grpc.experimental.unary_unary(request, target, '/Ydb.Import.V1.ImportService/ListObjectsInS3Export',
117+
protos_dot_ydb__import__pb2.ListObjectsInS3ExportRequest.SerializeToString,
118+
protos_dot_ydb__import__pb2.ListObjectsInS3ExportResponse.FromString,
119+
options, channel_credentials,
120+
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
121+
88122
@staticmethod
89123
def ImportData(request,
90124
target,

ydb/_grpc/v3/ydb_query_v1_pb2_grpc.py

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,7 @@
77

88

99
class QueryServiceStub(object):
10-
"""! WARNING: Experimental API
11-
! This API is currently in experimental state and is a subject for changes.
12-
! No backward and/or forward compatibility guarantees are provided.
13-
! DO NOT USE for production workloads.
14-
15-
"""
10+
"""Missing associated documentation comment in .proto file."""
1611

1712
def __init__(self, channel):
1813
"""Constructor.
@@ -68,12 +63,7 @@ def __init__(self, channel):
6863

6964

7065
class QueryServiceServicer(object):
71-
"""! WARNING: Experimental API
72-
! This API is currently in experimental state and is a subject for changes.
73-
! No backward and/or forward compatibility guarantees are provided.
74-
! DO NOT USE for production workloads.
75-
76-
"""
66+
"""Missing associated documentation comment in .proto file."""
7767

7868
def CreateSession(self, request, context):
7969
"""Sessions are basic primitives for communicating with YDB Query Service. The are similar to
@@ -214,12 +204,7 @@ def add_QueryServiceServicer_to_server(servicer, server):
214204

215205
# This class is part of an EXPERIMENTAL API.
216206
class QueryService(object):
217-
"""! WARNING: Experimental API
218-
! This API is currently in experimental state and is a subject for changes.
219-
! No backward and/or forward compatibility guarantees are provided.
220-
! DO NOT USE for production workloads.
221-
222-
"""
207+
"""Missing associated documentation comment in .proto file."""
223208

224209
@staticmethod
225210
def CreateSession(request,

ydb/_grpc/v4/draft/protos/ydb_federated_query_pb2.py

Lines changed: 286 additions & 224 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)