Skip to content

Commit c3a3b2d

Browse files
authored
Merge pull request #571 from ydb-platform/add_consumer_stats
Add consumer stats to public consumer
2 parents 0104976 + 262558b commit c3a3b2d

File tree

3 files changed

+35
-1
lines changed

3 files changed

+35
-1
lines changed

tests/topics/test_control_plane.py

+3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ async def test_describe_topic(self, driver, topic_path: str, topic_consumer):
3434

3535
has_consumer = False
3636
for consumer in res.consumers:
37+
assert consumer.consumer_stats is not None
38+
for stat in ["min_partitions_last_read_time", "max_read_time_lag", "max_write_time_lag", "bytes_read"]:
39+
assert getattr(consumer.consumer_stats, stat, None) is not None
3740
if consumer.name == topic_consumer:
3841
has_consumer = True
3942
break

ydb/_grpc/grpcwrapper/ydb_topic.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -907,10 +907,11 @@ def to_public(self) -> ydb_topic_public_types.PublicConsumer:
907907
read_from=self.read_from,
908908
supported_codecs=self.supported_codecs.to_public(),
909909
attributes=self.attributes,
910+
consumer_stats=self.consumer_stats.to_public(),
910911
)
911912

912913
@dataclass
913-
class ConsumerStats(IFromProto):
914+
class ConsumerStats(IFromProto, IToPublic):
914915
min_partitions_last_read_time: datetime.datetime
915916
max_read_time_lag: datetime.timedelta
916917
max_write_time_lag: datetime.timedelta
@@ -927,6 +928,14 @@ def from_proto(
927928
bytes_read=MultipleWindowsStat.from_proto(msg.bytes_read),
928929
)
929930

931+
def to_public(self) -> ydb_topic_public_types.PublicConsumer.ConsumerStats:
932+
return ydb_topic_public_types.PublicConsumer.ConsumerStats(
933+
min_partitions_last_read_time=self.min_partitions_last_read_time,
934+
max_read_time_lag=self.max_read_time_lag,
935+
max_write_time_lag=self.max_write_time_lag,
936+
bytes_read=self.bytes_read,
937+
)
938+
930939

931940
@dataclass
932941
class AlterConsumer(IToProto, IFromPublic):

ydb/_grpc/grpcwrapper/ydb_topic_public_types.py

+22
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,28 @@ class PublicConsumer:
117117
attributes: Dict[str, str] = field(default_factory=lambda: dict())
118118
"Attributes of consumer"
119119

120+
consumer_stats: Optional["PublicConsumer.ConsumerStats"] = None
121+
122+
@dataclass
123+
class ConsumerStats:
124+
min_partitions_last_read_time: datetime.datetime
125+
"Minimal timestamp of last read from partitions."
126+
127+
max_read_time_lag: datetime.timedelta
128+
"""
129+
Maximum of differences between timestamp of read and write timestamp for all messages,
130+
read during last minute.
131+
"""
132+
133+
max_write_time_lag: datetime.timedelta
134+
"""
135+
Maximum of differences between write timestamp and create timestamp for all messages,
136+
written during last minute.
137+
"""
138+
139+
bytes_read: "PublicMultipleWindowsStat"
140+
"Bytes read statistics."
141+
120142

121143
@dataclass
122144
class PublicAlterConsumer:

0 commit comments

Comments
 (0)