Skip to content

Commit a319b91

Browse files
committed
Harmonize implementations of callbacks invocations
1 parent eadecd3 commit a319b91

File tree

1 file changed

+10
-10
lines changed

1 file changed

+10
-10
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceEventsProcessor.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -156,38 +156,38 @@ private void processStreamsOnAllTasksLostCallbackNeededEvent(final StreamsOnAllT
156156

157157
private StreamsOnTasksRevokedCallbackCompletedEvent invokeOnTasksRevokedCallback(final Set<StreamsRebalanceData.TaskId> activeTasksToRevoke,
158158
final CompletableFuture<Void> future) {
159+
final Optional<KafkaException> error;
159160
final Optional<Exception> exceptionFromCallback = rebalanceCallbacks.onTasksRevoked(activeTasksToRevoke);
160-
return exceptionFromCallback
161-
.map(exception ->
162-
new StreamsOnTasksRevokedCallbackCompletedEvent(
163-
future,
164-
Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exception, "Task revocation callback throws an error"))
165-
))
166-
.orElseGet(() -> new StreamsOnTasksRevokedCallbackCompletedEvent(future, Optional.empty()));
161+
if (exceptionFromCallback.isPresent()) {
162+
error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "Task revocation callback throws an error"));
163+
} else {
164+
error = Optional.empty();
165+
}
166+
return new StreamsOnTasksRevokedCallbackCompletedEvent(future, error);
167167
}
168168

169169
private StreamsOnTasksAssignedCallbackCompletedEvent invokeOnTasksAssignedCallback(final StreamsRebalanceData.Assignment assignment,
170170
final CompletableFuture<Void> future) {
171-
Optional<KafkaException> error = Optional.empty();
171+
final Optional<KafkaException> error;
172172
final Optional<Exception> exceptionFromCallback = rebalanceCallbacks.onTasksAssigned(assignment);
173173
if (exceptionFromCallback.isPresent()) {
174174
error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "Task assignment callback throws an error"));
175175
} else {
176+
error = Optional.empty();
176177
streamsRebalanceData.setReconciledAssignment(assignment);
177178
}
178179
return new StreamsOnTasksAssignedCallbackCompletedEvent(future, error);
179180
}
180181

181182
private StreamsOnAllTasksLostCallbackCompletedEvent invokeOnAllTasksLostCallback(final CompletableFuture<Void> future) {
182-
final Optional<Exception> exceptionFromCallback = rebalanceCallbacks.onAllTasksLost();
183183
final Optional<KafkaException> error;
184+
final Optional<Exception> exceptionFromCallback = rebalanceCallbacks.onAllTasksLost();
184185
if (exceptionFromCallback.isPresent()) {
185186
error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "All tasks lost callback throws an error"));
186187
} else {
187188
error = Optional.empty();
188189
streamsRebalanceData.setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY);
189190
}
190-
191191
return new StreamsOnAllTasksLostCallbackCompletedEvent(future, error);
192192
}
193193

0 commit comments

Comments
 (0)