Skip to content

KAFKA-18943: Kafka Streams incorrectly commits TX during task revokation #19164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 13, 2025

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Mar 8, 2025

Fixes two issues:

  • only commit TX if no revoked tasks need to be committed
  • commit revoked tasks after punctuation triggered

Fixes two issues:
 - only commit TX if no revoked tasks need to be committed
 - commit revoked tasks after punctution triggered
@mjsax mjsax added the streams label Mar 8, 2025
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey! I made a pass on the production code (not the test yet), looks good to me, with a minor comment.

Long term, I'd suggest we remove the optimization altogether, as it seems fragile. Revocations are rare enough that we can just commit all tasks (not just the revoked ones), which would be much easier to reason about.

@@ -1153,8 +1157,7 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be moved into the if below, instead of being called (but executing a no-op) in the "optimized" case.

Copy link
Member Author

@mjsax mjsax Mar 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... Thinking about this again, it seems the if below should actually only apply to EOSv2 case? I believe we did actually include some task unnecessarily for ALOS (and older version EOSv1) case?

However, changing this code below does break two tests...

if (processingMode == EXACTLY_ONCE_V2 && revokedTasksNeedCommit) {
    prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask);
}

Tests:

TaskManagerTest#shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithAlos()
TaskManagerTest#shouldCommitAllNeededTasksOnHandleRevocation()

At least the second test assumes we commit everything for ALOS, too. I was added when we added EOSv2 and unified commit logic (#8318) -- but I cannot remember why we did it this way... \cc @guozhangwang @ableegoldman do you remember?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. Well my original reaction was the same as yours Matthias, ie that this should only apply to EOSv2 as I thought we only committed the necessary tasks with ALOS since he don't have to commit the entire transaction

However, now that you bring this up: I have a vague memory of making this case before, but ultimately us agreeing to just commit every task under ALOS in order to keep the logic from branching too much. I'm pretty sure it was a "let's keep things simple and if we need to optimize further then we can stop committing non-revoked tasks under ALOS"

I don't think it's become a problem yet so let's keep it as is and do the same for EOS and ALOS.

Also FWIW I also had the same thought as Lucas, let's move this line into the if revokedTasksNeedCommit condition as well, just because it makes the logic easier to follow.

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @mjsax !

Here my comments.

// commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make sure we don't skip the
// offset commit because we are in a rebalance
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
if (revokedTasksNeedCommit) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have this, do we want to also adapt the following condition in TaskExecutor#commitOffsetsOrTransaction()

if (!offsetsPerTask.isEmpty() || taskManager.streamsProducer().transactionInFlight()) {

to

if (!offsetsPerTask.isEmpty()) {

(and maybe move it to the outermost context as it was before the PR that introduced the bug)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I can follow?

Your proposal would say, we stop committing if a TX is in-flight, but we do want to commit for this case, right? Even if offset-map is empty.

And moving it to the outer-most context seems not to be "correct", because checking if a TX is inflight for the ALOS case seems unnecessary (guess it would not be wrong, because the call would just always return false so not really changing anything effectively, but it seems unnecessary to change the code)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it again, I agree we still need the TX in-flight check, because a punctuator could write to a changelog topic without consuming any records from input topics. That means, there might be an empty offset map and a TX in-flight that should be committed.

Of course, if we cannot get rid of the TX in-flight, it does not make any sense to move the complete check to the outermost context for the reasons you mentioned. I thought that was clear from my comment that the movement only applies if we adapt the condition.

Comment on lines 950 to 960
@Test
public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() throws Exception {
shouldNotProduceDuplicates(false);
}

@Test
public void shouldCommitAllTasksIfRevokedTaskTriggerPunctuation() throws Exception {
shouldNotProduceDuplicates(true);
}

private void shouldNotProduceDuplicates(final boolean usePunctuation) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
@Test
public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() throws Exception {
shouldNotProduceDuplicates(false);
}
@Test
public void shouldCommitAllTasksIfRevokedTaskTriggerPunctuation() throws Exception {
shouldNotProduceDuplicates(true);
}
private void shouldNotProduceDuplicates(final boolean usePunctuation) throws Exception {
@ParameterizedTest(name = "{argumentsWithNames}")
@FieldSource("namedArguments")
@ParameterizedTest(name = "shouldCommitAllTasks with punctuation: {0}")
@ValueSource(booleans = {true, false})
public void shouldNotProduceDuplicates(final boolean usePunctuation) throws Exception {
...
}
private static List<Arguments> namedArguments = Arrays.asList(
arguments(named("Should not commit active tasks with pending input if revoked task did not make progress"), false),
arguments(named("Should commit all tasks if revoked task triggers punctuation"), true)
);

Copy link
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM modulo the one comment about moving that line into the if (revokedTasksNeedCommit) condition

@mjsax
Copy link
Member Author

mjsax commented Mar 11, 2025

Updates this PR according to comments. Split out the "punctuation" test, because it's actually not EOS specific. Also added unit tests to TaskManagerTest.

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM modulo comments from Bruno and Lucas

@mjsax mjsax merged commit 2181ddb into apache:trunk Mar 13, 2025
21 of 22 checks passed
@mjsax mjsax deleted the kafka-18943-eosv2-fix branch March 13, 2025 16:38
mjsax added a commit that referenced this pull request Mar 13, 2025
…ion (#19164)

Fixes two issues:
 - only commit TX if no revoked tasks need to be committed
 - commit revoked tasks after punctuation triggered

Reviewers: Lucas Brutschy <[email protected]>, Anna Sophie Blee-Goldman <[email protected]>, Bruno Cadonna <[email protected]>, Bill Bejeck <[email protected]>
mjsax added a commit that referenced this pull request Mar 13, 2025
…ion (#19164)

Fixes two issues:
 - only commit TX if no revoked tasks need to be committed
 - commit revoked tasks after punctuation triggered

Reviewers: Lucas Brutschy <[email protected]>, Anna Sophie Blee-Goldman <[email protected]>, Bruno Cadonna <[email protected]>, Bill Bejeck <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants