Skip to content

Commit 0cce043

Browse files
dpkpCourouge
andauthored
Support DescribeLogDirs admin api (#2475)
Co-authored-by: chopatate <[email protected]>
1 parent c15720b commit 0cce043

File tree

2 files changed

+59
-1
lines changed

2 files changed

+59
-1
lines changed

kafka/admin/client.py

+17-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from kafka.protocol.admin import (
2121
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
2222
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
23-
DeleteGroupsRequest
23+
DeleteGroupsRequest, DescribeLogDirsRequest
2424
)
2525
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
2626
from kafka.protocol.metadata import MetadataRequest
@@ -1345,3 +1345,19 @@ def _wait_for_futures(self, futures):
13451345

13461346
if future.failed():
13471347
raise future.exception # pylint: disable-msg=raising-bad-type
1348+
1349+
def describe_log_dirs(self):
1350+
"""Send a DescribeLogDirsRequest request to a broker.
1351+
1352+
:return: A message future
1353+
"""
1354+
version = self._matching_api_version(DescribeLogDirsRequest)
1355+
if version <= 0:
1356+
request = DescribeLogDirsRequest[version]()
1357+
future = self._send_request_to_node(self._client.least_loaded_node(), request)
1358+
self._wait_for_futures([future])
1359+
else:
1360+
raise NotImplementedError(
1361+
"Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient."
1362+
.format(version))
1363+
return future.value

kafka/protocol/admin.py

+42
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,48 @@ class DescribeConfigsRequest_v2(Request):
790790
]
791791

792792

793+
class DescribeLogDirsResponse_v0(Response):
794+
API_KEY = 35
795+
API_VERSION = 0
796+
FLEXIBLE_VERSION = True
797+
SCHEMA = Schema(
798+
('throttle_time_ms', Int32),
799+
('log_dirs', Array(
800+
('error_code', Int16),
801+
('log_dir', String('utf-8')),
802+
('topics', Array(
803+
('name', String('utf-8')),
804+
('partitions', Array(
805+
('partition_index', Int32),
806+
('partition_size', Int64),
807+
('offset_lag', Int64),
808+
('is_future_key', Boolean)
809+
))
810+
))
811+
))
812+
)
813+
814+
815+
class DescribeLogDirsRequest_v0(Request):
816+
API_KEY = 35
817+
API_VERSION = 0
818+
RESPONSE_TYPE = DescribeLogDirsResponse_v0
819+
SCHEMA = Schema(
820+
('topics', Array(
821+
('topic', String('utf-8')),
822+
('partitions', Int32)
823+
))
824+
)
825+
826+
827+
DescribeLogDirsResponse = [
828+
DescribeLogDirsResponse_v0,
829+
]
830+
DescribeLogDirsRequest = [
831+
DescribeLogDirsRequest_v0,
832+
]
833+
834+
793835
class SaslAuthenticateResponse_v0(Response):
794836
API_KEY = 36
795837
API_VERSION = 0

0 commit comments

Comments
 (0)