KAFKA-19902: fix(consumer): use latest leader epoch when committing offsets#22558
Open
wilmerdooley wants to merge 1 commit into
Open
KAFKA-19902: fix(consumer): use latest leader epoch when committing offsets#22558wilmerdooley wants to merge 1 commit into
wilmerdooley wants to merge 1 commit into
Conversation
…set uses stale epoch after Signed-off-by: wilmerdooley <wilmerdooley1@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
When the partition leader changes, the consumer's
FetchPosition.currentLeadercan become stale because it is inherited from the previous position and only refreshed when leader-change errors are observed.SubscriptionState.allConsumed()then committedposition.offsetEpoch(the epoch of the last consumed batch) instead, which can reference log segments that have since been deleted by retention, causingOFFSET_OUT_OF_RANGEon the next session and anauto.offset.resetjump.This change keeps
currentLeaderfresh by looking it up in metadata every time the fetch position is advanced inFetchCollector, and makesSubscriptionState.allConsumed()use the maximum ofoffsetEpochandcurrentLeader.epoch(falling back to whichever is present) so that the committed offset epoch matches the most recent leader epoch known to the client.Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.
If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.
If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.
Testing strategy
Updated
SubscriptionStateTest.testAllConsumedUsesLatestAvailableLeaderEpochcovers the newallConsumed()epoch selection across the four combinations of present/absentoffsetEpochandcurrentLeader.epoch.Added
FetchCollectorTest.testPositionUpdateUsesCurrentLeaderEpochto verify that advancing the fetch position refreshescurrentLeader.epochfrom the metadata cache.Unit tests added/updated
JIRA: https://issues.apache.org/jira/browse/KAFKA-19902