Skip to content

Commit 50df18c

Browse files
cadonnamjsax
authored andcommitted
KAFKA-18518: Add processor to handle rebalance events (apache#18527)
This commit adds a processor named StreamsRebalanceEventsProcessor that handles the rebalance events sent from the background thread of the async consumer to the stream thread when an task assignment changes. It also adds the corresponding rebalance events. Additionally, this commit adds StreamsRebalanceData that maintains the data that is exchanges for the Streams rebalance protocol. All of these are used by the Streams heartbeat request manager and the Streams membership manager that will be added in a future commit. Reviewer: Lucas Brutschy <[email protected]>
1 parent 0ac9a95 commit 50df18c

13 files changed

+1366
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer.internals;
18+
19+
import java.util.Optional;
20+
import java.util.Set;
21+
22+
/**
23+
* Callbacks for handling Streams group rebalance events in Kafka Streams.
24+
*/
25+
public interface StreamsGroupRebalanceCallbacks {
26+
27+
/**
28+
* Called when tasks are revoked from a stream thread.
29+
*
30+
* @param tasks The tasks to be revoked.
31+
* @return The exception thrown during the callback, if any.
32+
*/
33+
Optional<Exception> onTasksRevoked(final Set<StreamsRebalanceData.TaskId> tasks);
34+
35+
/**
36+
* Called when tasks are assigned from a stream thread.
37+
*
38+
* @param assignment The tasks assigned.
39+
* @return The exception thrown during the callback, if any.
40+
*/
41+
Optional<Exception> onTasksAssigned(final StreamsRebalanceData.Assignment assignment);
42+
43+
/**
44+
* Called when a stream thread loses all assigned tasks.
45+
*
46+
* @return The exception thrown during the callback, if any.
47+
*/
48+
Optional<Exception> onAllTasksLost();
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer.internals;
18+
19+
import java.util.Collection;
20+
import java.util.Collections;
21+
import java.util.Comparator;
22+
import java.util.Map;
23+
import java.util.Objects;
24+
import java.util.Optional;
25+
import java.util.Set;
26+
import java.util.concurrent.atomic.AtomicReference;
27+
28+
/**
29+
* This class holds the data that is needed to participate in the Streams rebalance protocol.
30+
*/
31+
public class StreamsRebalanceData {
32+
33+
public static class TaskId implements Comparable<TaskId> {
34+
35+
private final String subtopologyId;
36+
private final int partitionId;
37+
38+
public TaskId(final String subtopologyId, final int partitionId) {
39+
this.subtopologyId = Objects.requireNonNull(subtopologyId, "Subtopology ID cannot be null");
40+
this.partitionId = partitionId;
41+
}
42+
43+
public int partitionId() {
44+
return partitionId;
45+
}
46+
47+
public String subtopologyId() {
48+
return subtopologyId;
49+
}
50+
51+
@Override
52+
public boolean equals(Object o) {
53+
if (this == o) return true;
54+
if (o == null || getClass() != o.getClass()) return false;
55+
TaskId taskId = (TaskId) o;
56+
return partitionId == taskId.partitionId && Objects.equals(subtopologyId, taskId.subtopologyId);
57+
}
58+
59+
@Override
60+
public int hashCode() {
61+
return Objects.hash(subtopologyId, partitionId);
62+
}
63+
64+
@Override
65+
public int compareTo(TaskId taskId) {
66+
Objects.requireNonNull(taskId, "taskId cannot be null");
67+
return Comparator.comparing(TaskId::subtopologyId)
68+
.thenComparingInt(TaskId::partitionId).compare(this, taskId);
69+
}
70+
71+
@Override
72+
public String toString() {
73+
return "TaskId{" +
74+
"subtopologyId=" + subtopologyId +
75+
", partitionId=" + partitionId +
76+
'}';
77+
}
78+
}
79+
80+
public static class Assignment {
81+
82+
public static final Assignment EMPTY = new Assignment();
83+
84+
private final Set<TaskId> activeTasks;
85+
86+
private final Set<TaskId> standbyTasks;
87+
88+
private final Set<TaskId> warmupTasks;
89+
90+
private Assignment() {
91+
this.activeTasks = Set.of();
92+
this.standbyTasks = Set.of();
93+
this.warmupTasks = Set.of();
94+
}
95+
96+
public Assignment(final Set<TaskId> activeTasks,
97+
final Set<TaskId> standbyTasks,
98+
final Set<TaskId> warmupTasks) {
99+
this.activeTasks = Set.copyOf(Objects.requireNonNull(activeTasks, "Active tasks cannot be null"));
100+
this.standbyTasks = Set.copyOf(Objects.requireNonNull(standbyTasks, "Standby tasks cannot be null"));
101+
this.warmupTasks = Set.copyOf(Objects.requireNonNull(warmupTasks, "Warmup tasks cannot be null"));
102+
}
103+
104+
public Set<TaskId> activeTasks() {
105+
return activeTasks;
106+
}
107+
108+
public Set<TaskId> standbyTasks() {
109+
return standbyTasks;
110+
}
111+
112+
public Set<TaskId> warmupTasks() {
113+
return warmupTasks;
114+
}
115+
116+
@Override
117+
public boolean equals(final Object o) {
118+
if (this == o) {
119+
return true;
120+
}
121+
if (o == null || getClass() != o.getClass()) {
122+
return false;
123+
}
124+
final Assignment that = (Assignment) o;
125+
return Objects.equals(activeTasks, that.activeTasks)
126+
&& Objects.equals(standbyTasks, that.standbyTasks)
127+
&& Objects.equals(warmupTasks, that.warmupTasks);
128+
}
129+
130+
@Override
131+
public int hashCode() {
132+
return Objects.hash(activeTasks, standbyTasks, warmupTasks);
133+
}
134+
135+
public Assignment copy() {
136+
return new Assignment(activeTasks, standbyTasks, warmupTasks);
137+
}
138+
139+
@Override
140+
public String toString() {
141+
return "Assignment{" +
142+
"activeTasks=" + activeTasks +
143+
", standbyTasks=" + standbyTasks +
144+
", warmupTasks=" + warmupTasks +
145+
'}';
146+
}
147+
}
148+
149+
public static class Subtopology {
150+
151+
private final Set<String> sourceTopics;
152+
private final Set<String> repartitionSinkTopics;
153+
private final Map<String, TopicInfo> stateChangelogTopics;
154+
private final Map<String, TopicInfo> repartitionSourceTopics;
155+
private final Collection<Set<String>> copartitionGroups;
156+
157+
public Subtopology(final Set<String> sourceTopics,
158+
final Set<String> repartitionSinkTopics,
159+
final Map<String, TopicInfo> repartitionSourceTopics,
160+
final Map<String, TopicInfo> stateChangelogTopics,
161+
final Collection<Set<String>> copartitionGroups
162+
) {
163+
this.sourceTopics = Set.copyOf(Objects.requireNonNull(sourceTopics, "Subtopology ID cannot be null"));
164+
this.repartitionSinkTopics =
165+
Set.copyOf(Objects.requireNonNull(repartitionSinkTopics, "Repartition sink topics cannot be null"));
166+
this.repartitionSourceTopics =
167+
Map.copyOf(Objects.requireNonNull(repartitionSourceTopics, "Repartition source topics cannot be null"));
168+
this.stateChangelogTopics =
169+
Map.copyOf(Objects.requireNonNull(stateChangelogTopics, "State changelog topics cannot be null"));
170+
this.copartitionGroups =
171+
Collections.unmodifiableCollection(Objects.requireNonNull(
172+
copartitionGroups,
173+
"Co-partition groups cannot be null"
174+
)
175+
);
176+
}
177+
178+
public Set<String> sourceTopics() {
179+
return sourceTopics;
180+
}
181+
182+
public Set<String> repartitionSinkTopics() {
183+
return repartitionSinkTopics;
184+
}
185+
186+
public Map<String, TopicInfo> stateChangelogTopics() {
187+
return stateChangelogTopics;
188+
}
189+
190+
public Map<String, TopicInfo> repartitionSourceTopics() {
191+
return repartitionSourceTopics;
192+
}
193+
194+
public Collection<Set<String>> copartitionGroups() {
195+
return copartitionGroups;
196+
}
197+
198+
@Override
199+
public String toString() {
200+
return "Subtopology{" +
201+
"sourceTopics=" + sourceTopics +
202+
", repartitionSinkTopics=" + repartitionSinkTopics +
203+
", stateChangelogTopics=" + stateChangelogTopics +
204+
", repartitionSourceTopics=" + repartitionSourceTopics +
205+
", copartitionGroups=" + copartitionGroups +
206+
'}';
207+
}
208+
}
209+
210+
public static class TopicInfo {
211+
212+
private final Optional<Integer> numPartitions;
213+
private final Optional<Short> replicationFactor;
214+
private final Map<String, String> topicConfigs;
215+
216+
public TopicInfo(final Optional<Integer> numPartitions,
217+
final Optional<Short> replicationFactor,
218+
final Map<String, String> topicConfigs) {
219+
this.numPartitions = Objects.requireNonNull(numPartitions, "Number of partitions cannot be null");
220+
this.replicationFactor = Objects.requireNonNull(replicationFactor, "Replication factor cannot be null");
221+
this.topicConfigs =
222+
Map.copyOf(Objects.requireNonNull(topicConfigs, "Additional topic configs cannot be null"));
223+
}
224+
225+
public Optional<Integer> numPartitions() {
226+
return numPartitions;
227+
}
228+
229+
public Optional<Short> replicationFactor() {
230+
return replicationFactor;
231+
}
232+
233+
public Map<String, String> topicConfigs() {
234+
return topicConfigs;
235+
}
236+
237+
@Override
238+
public String toString() {
239+
return "TopicInfo{" +
240+
"numPartitions=" + numPartitions +
241+
", replicationFactor=" + replicationFactor +
242+
", topicConfigs=" + topicConfigs +
243+
'}';
244+
}
245+
}
246+
247+
private final Map<String, Subtopology> subtopologies;
248+
249+
private final AtomicReference<Assignment> reconciledAssignment = new AtomicReference<>(Assignment.EMPTY);
250+
251+
public StreamsRebalanceData(Map<String, Subtopology> subtopologies) {
252+
this.subtopologies = Map.copyOf(Objects.requireNonNull(subtopologies, "Subtopologies cannot be null"));
253+
}
254+
255+
public Map<String, Subtopology> subtopologies() {
256+
return subtopologies;
257+
}
258+
259+
public void setReconciledAssignment(final Assignment assignment) {
260+
reconciledAssignment.set(assignment);
261+
}
262+
263+
public Assignment reconciledAssignment() {
264+
return reconciledAssignment.get();
265+
}
266+
}

0 commit comments

Comments
 (0)