Skip to content

Commit d34ad1a

Browse files
committed
Add javadocs
1 parent 95e3b40 commit d34ad1a

File tree

3 files changed

+65
-7
lines changed

3 files changed

+65
-7
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupRebalanceCallbacks.java

+20
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,31 @@
1919
import java.util.Optional;
2020
import java.util.Set;
2121

22+
/**
23+
* Callbacks for handling Streams group rebalance events in Kafka Streams.
24+
*/
2225
public interface StreamsGroupRebalanceCallbacks {
2326

27+
/**
28+
* Called when tasks are revoked from a stream thread.
29+
*
30+
* @param tasks The tasks to be revoked.
31+
* @return The exception thrown during the callback, if any.
32+
*/
2433
Optional<Exception> onTasksRevoked(final Set<StreamsRebalanceData.TaskId> tasks);
2534

35+
/**
36+
* Called when tasks are assigned from a stream thread.
37+
*
38+
* @param assignment The tasks assigned.
39+
* @return The exception thrown during the callback, if any.
40+
*/
2641
Optional<Exception> onTasksAssigned(final StreamsRebalanceData.Assignment assignment);
2742

43+
/**
44+
* Called when a stream thread loses all assigned tasks.
45+
*
46+
* @return The exception thrown during the callback, if any.
47+
*/
2848
Optional<Exception> onAllTasksLost();
2949
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import java.util.Set;
2525
import java.util.concurrent.atomic.AtomicReference;
2626

27+
/**
28+
* This class holds the data that is needed to participate in the Streams rebalance protocol.
29+
*/
2730
public class StreamsRebalanceData {
2831

2932
public static class Assignment {
@@ -187,13 +190,7 @@ public String toString() {
187190

188191
private final Map<String, Subtopology> subtopologies;
189192

190-
private final AtomicReference<Assignment> reconciledAssignment = new AtomicReference<>(
191-
new Assignment(
192-
new HashSet<>(),
193-
new HashSet<>(),
194-
new HashSet<>()
195-
)
196-
);
193+
private final AtomicReference<Assignment> reconciledAssignment = new AtomicReference<>(Assignment.EMPTY);
197194

198195
public StreamsRebalanceData(Map<String, Subtopology> subtopologies) {
199196
this.subtopologies = subtopologies;

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceEventsProcessor.java

+41
Original file line numberDiff line numberDiff line change
@@ -34,37 +34,75 @@
3434
import java.util.concurrent.CompletableFuture;
3535
import java.util.concurrent.LinkedBlockingQueue;
3636

37+
/**
38+
* Processes events from the Streams rebalance protocol.
39+
* <p>
40+
* The Streams rebalance processor receives events from the background thread of the async consumer, more precisely
41+
* from the Streams membership manager and handles them.
42+
* For example, events are requests for invoking the task assignment and task revocation callbacks.
43+
* Results of the event handling are passed back to the background thread.
44+
*/
3745
public class StreamsRebalanceEventsProcessor {
3846

3947
private final BlockingQueue<BackgroundEvent> onCallbackRequests = new LinkedBlockingQueue<>();
4048
private ApplicationEventHandler applicationEventHandler = null;
4149
private final StreamsGroupRebalanceCallbacks rebalanceCallbacks;
4250
private final StreamsRebalanceData streamsRebalanceData;
4351

52+
/**
53+
* Constructs the Streams rebalance processor.
54+
*
55+
* @param streamsRebalanceData
56+
* @param rebalanceCallbacks
57+
*/
4458
public StreamsRebalanceEventsProcessor(StreamsRebalanceData streamsRebalanceData,
4559
StreamsGroupRebalanceCallbacks rebalanceCallbacks) {
4660
this.streamsRebalanceData = streamsRebalanceData;
4761
this.rebalanceCallbacks = rebalanceCallbacks;
4862
}
4963

64+
/**
65+
* Requests the invocation of the task assignment callback.
66+
*
67+
* @param assignment The tasks to be assigned to the member of the Streams group.
68+
* @return A future that will be completed when the callback has been invoked.
69+
*/
5070
public CompletableFuture<Void> requestOnTasksAssignedCallbackInvocation(final StreamsRebalanceData.Assignment assignment) {
5171
final StreamsOnTasksAssignedCallbackNeededEvent onTasksAssignedCallbackNeededEvent = new StreamsOnTasksAssignedCallbackNeededEvent(assignment);
5272
onCallbackRequests.add(onTasksAssignedCallbackNeededEvent);
5373
return onTasksAssignedCallbackNeededEvent.future();
5474
}
5575

76+
/**
77+
* Requests the invocation of the task revocation callback.
78+
*
79+
* @param activeTasksToRevoke The tasks to revoke from the member of the Streams group
80+
* @return A future that will be completed when the callback has been invoked.
81+
*/
5682
public CompletableFuture<Void> requestOnTasksRevokedCallbackInvocation(final Set<StreamsRebalanceData.TaskId> activeTasksToRevoke) {
5783
final StreamsOnTasksRevokedCallbackNeededEvent onTasksRevokedCallbackNeededEvent = new StreamsOnTasksRevokedCallbackNeededEvent(activeTasksToRevoke);
5884
onCallbackRequests.add(onTasksRevokedCallbackNeededEvent);
5985
return onTasksRevokedCallbackNeededEvent.future();
6086
}
6187

88+
/**
89+
* Requests the invocation of the all tasks lost callback.
90+
*
91+
* @return A future that will be completed when the callback has been invoked.
92+
*/
6293
public CompletableFuture<Void> requestOnAllTasksLostCallbackInvocation() {
6394
final StreamsOnAllTasksLostCallbackNeededEvent onAllTasksLostCallbackNeededEvent = new StreamsOnAllTasksLostCallbackNeededEvent();
6495
onCallbackRequests.add(onAllTasksLostCallbackNeededEvent);
6596
return onAllTasksLostCallbackNeededEvent.future();
6697
}
6798

99+
/**
100+
* Sets the application event handler.
101+
*
102+
* The application handler sends the results of the callbacks to the background thread.
103+
*
104+
* @param applicationEventHandler The application handler.
105+
*/
68106
public void setApplicationEventHandler(final ApplicationEventHandler applicationEventHandler) {
69107
this.applicationEventHandler = applicationEventHandler;
70108
}
@@ -153,6 +191,9 @@ private StreamsOnAllTasksLostCallbackCompletedEvent invokeOnAllTasksLostCallback
153191
return new StreamsOnAllTasksLostCallbackCompletedEvent(future, error);
154192
}
155193

194+
/**
195+
* Processes all events received from the background thread so far.
196+
*/
156197
public void process() {
157198
LinkedList<BackgroundEvent> events = new LinkedList<>();
158199
onCallbackRequests.drainTo(events);

0 commit comments

Comments
 (0)