@@ -525,7 +525,7 @@ def commit(self, offsets=None):
525525 offsets = self ._subscription .all_consumed_offsets ()
526526 self ._coordinator .commit_offsets_sync (offsets )
527527
528- def committed (self , partition ):
528+ def committed (self , partition , metadata = False ):
529529 """Get the last committed offset for the given partition.
530530
531531 This offset will be used as the position for the consumer
@@ -537,9 +537,11 @@ def committed(self, partition):
537537
538538 Arguments:
539539 partition (TopicPartition): The partition to check.
540+ metadata (bool, optional): If True, return OffsetAndMetadata struct
541+ instead of offset int. Default: False.
540542
541543 Returns:
542- The last committed offset, or None if there was no prior commit.
544+ The last committed offset (int or OffsetAndMetadata) , or None if there was no prior commit.
543545 """
544546 assert self .config ['api_version' ] >= (0 , 8 , 1 ), 'Requires >= Kafka 0.8.1'
545547 assert self .config ['group_id' ] is not None , 'Requires group_id'
@@ -553,10 +555,15 @@ def committed(self, partition):
553555 else :
554556 commit_map = self ._coordinator .fetch_committed_offsets ([partition ])
555557 if partition in commit_map :
556- committed = commit_map [partition ]. offset
558+ committed = commit_map [partition ]
557559 else :
558560 committed = None
559- return committed
561+
562+ if committed is not None :
563+ if metadata :
564+ return committed
565+ else :
566+ return committed .offset
560567
561568 def _fetch_all_topic_metadata (self ):
562569 """A blocking call that fetches topic metadata for all topics in the
0 commit comments