Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18518: Add processor to handle rebalance events #18527

Merged
merged 13 commits into from
Jan 22, 2025
Prev Previous commit
Next Next commit
Harmonize implementations of callbacks invocations
cadonna committed Jan 21, 2025
commit a319b91c4dea83cf07575203aef6b639161ea0f9
Original file line number Diff line number Diff line change
@@ -156,38 +156,38 @@ private void processStreamsOnAllTasksLostCallbackNeededEvent(final StreamsOnAllT

private StreamsOnTasksRevokedCallbackCompletedEvent invokeOnTasksRevokedCallback(final Set<StreamsRebalanceData.TaskId> activeTasksToRevoke,
final CompletableFuture<Void> future) {
final Optional<KafkaException> error;
final Optional<Exception> exceptionFromCallback = rebalanceCallbacks.onTasksRevoked(activeTasksToRevoke);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invokeOnTasksRevokedCallback has a different implementation than invokeOnTasksAssignedCallback and invokeOnAllTasksLostCallback. It's probably easier to read if all three use either the .map stile or the isPresent style. I don't mind which one, isPresent is probably a little easier to follow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I just applied IDEA's recommendation to transform it to functional style. However, I agree with you regarding readability.

return exceptionFromCallback
.map(exception ->
new StreamsOnTasksRevokedCallbackCompletedEvent(
future,
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exception, "Task revocation callback throws an error"))
))
.orElseGet(() -> new StreamsOnTasksRevokedCallbackCompletedEvent(future, Optional.empty()));
if (exceptionFromCallback.isPresent()) {
error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "Task revocation callback throws an error"));
} else {
error = Optional.empty();
}
return new StreamsOnTasksRevokedCallbackCompletedEvent(future, error);
}

private StreamsOnTasksAssignedCallbackCompletedEvent invokeOnTasksAssignedCallback(final StreamsRebalanceData.Assignment assignment,
final CompletableFuture<Void> future) {
Optional<KafkaException> error = Optional.empty();
final Optional<KafkaException> error;
final Optional<Exception> exceptionFromCallback = rebalanceCallbacks.onTasksAssigned(assignment);
if (exceptionFromCallback.isPresent()) {
error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "Task assignment callback throws an error"));
} else {
error = Optional.empty();
streamsRebalanceData.setReconciledAssignment(assignment);
}
return new StreamsOnTasksAssignedCallbackCompletedEvent(future, error);
}

private StreamsOnAllTasksLostCallbackCompletedEvent invokeOnAllTasksLostCallback(final CompletableFuture<Void> future) {
final Optional<Exception> exceptionFromCallback = rebalanceCallbacks.onAllTasksLost();
final Optional<KafkaException> error;
final Optional<Exception> exceptionFromCallback = rebalanceCallbacks.onAllTasksLost();
if (exceptionFromCallback.isPresent()) {
error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "All tasks lost callback throws an error"));
} else {
error = Optional.empty();
streamsRebalanceData.setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY);
}

return new StreamsOnAllTasksLostCallbackCompletedEvent(future, error);
}


Unchanged files with check annotations Beta

package org.apache.kafka.clients.consumer.internals;

Check notice on line 1 in clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java

GitHub Actions / build / Compile and Check Java

Unapproved License

File with unapproved license: clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
import org.junit.jupiter.api.Test;