diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupRebalanceCallbacks.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupRebalanceCallbacks.java new file mode 100644 index 0000000000000..4840e79ecebc4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupRebalanceCallbacks.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.Optional; +import java.util.Set; + +/** + * Callbacks for handling Streams group rebalance events in Kafka Streams. + */ +public interface StreamsGroupRebalanceCallbacks { + + /** + * Called when tasks are revoked from a stream thread. + * + * @param tasks The tasks to be revoked. + * @return The exception thrown during the callback, if any. + */ + Optional onTasksRevoked(final Set tasks); + + /** + * Called when tasks are assigned from a stream thread. + * + * @param assignment The tasks assigned. + * @return The exception thrown during the callback, if any. + */ + Optional onTasksAssigned(final StreamsRebalanceData.Assignment assignment); + + /** + * Called when a stream thread loses all assigned tasks. + * + * @return The exception thrown during the callback, if any. + */ + Optional onAllTasksLost(); +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java new file mode 100644 index 0000000000000..a8670eeb1b192 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This class holds the data that is needed to participate in the Streams rebalance protocol. + */ +public class StreamsRebalanceData { + + public static class TaskId implements Comparable { + + private final String subtopologyId; + private final int partitionId; + + public TaskId(final String subtopologyId, final int partitionId) { + this.subtopologyId = Objects.requireNonNull(subtopologyId, "Subtopology ID cannot be null"); + this.partitionId = partitionId; + } + + public int partitionId() { + return partitionId; + } + + public String subtopologyId() { + return subtopologyId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TaskId taskId = (TaskId) o; + return partitionId == taskId.partitionId && Objects.equals(subtopologyId, taskId.subtopologyId); + } + + @Override + public int hashCode() { + return Objects.hash(subtopologyId, partitionId); + } + + @Override + public int compareTo(TaskId taskId) { + Objects.requireNonNull(taskId, "taskId cannot be null"); + return Comparator.comparing(TaskId::subtopologyId) + .thenComparingInt(TaskId::partitionId).compare(this, taskId); + } + + @Override + public String toString() { + return "TaskId{" + + "subtopologyId=" + subtopologyId + + ", partitionId=" + partitionId + + '}'; + } + } + + public static class Assignment { + + public static final Assignment EMPTY = new Assignment(); + + private final Set activeTasks; + + private final Set standbyTasks; + + private final Set warmupTasks; + + private Assignment() { + this.activeTasks = Set.of(); + this.standbyTasks = Set.of(); + this.warmupTasks = Set.of(); + } + + public Assignment(final Set activeTasks, + final Set standbyTasks, + final Set warmupTasks) { + this.activeTasks = Set.copyOf(Objects.requireNonNull(activeTasks, "Active tasks cannot be null")); + this.standbyTasks = Set.copyOf(Objects.requireNonNull(standbyTasks, "Standby tasks cannot be null")); + this.warmupTasks = Set.copyOf(Objects.requireNonNull(warmupTasks, "Warmup tasks cannot be null")); + } + + public Set activeTasks() { + return activeTasks; + } + + public Set standbyTasks() { + return standbyTasks; + } + + public Set warmupTasks() { + return warmupTasks; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Assignment that = (Assignment) o; + return Objects.equals(activeTasks, that.activeTasks) + && Objects.equals(standbyTasks, that.standbyTasks) + && Objects.equals(warmupTasks, that.warmupTasks); + } + + @Override + public int hashCode() { + return Objects.hash(activeTasks, standbyTasks, warmupTasks); + } + + public Assignment copy() { + return new Assignment(activeTasks, standbyTasks, warmupTasks); + } + + @Override + public String toString() { + return "Assignment{" + + "activeTasks=" + activeTasks + + ", standbyTasks=" + standbyTasks + + ", warmupTasks=" + warmupTasks + + '}'; + } + } + + public static class Subtopology { + + private final Set sourceTopics; + private final Set repartitionSinkTopics; + private final Map stateChangelogTopics; + private final Map repartitionSourceTopics; + private final Collection> copartitionGroups; + + public Subtopology(final Set sourceTopics, + final Set repartitionSinkTopics, + final Map repartitionSourceTopics, + final Map stateChangelogTopics, + final Collection> copartitionGroups + ) { + this.sourceTopics = Set.copyOf(Objects.requireNonNull(sourceTopics, "Subtopology ID cannot be null")); + this.repartitionSinkTopics = + Set.copyOf(Objects.requireNonNull(repartitionSinkTopics, "Repartition sink topics cannot be null")); + this.repartitionSourceTopics = + Map.copyOf(Objects.requireNonNull(repartitionSourceTopics, "Repartition source topics cannot be null")); + this.stateChangelogTopics = + Map.copyOf(Objects.requireNonNull(stateChangelogTopics, "State changelog topics cannot be null")); + this.copartitionGroups = + Collections.unmodifiableCollection(Objects.requireNonNull( + copartitionGroups, + "Co-partition groups cannot be null" + ) + ); + } + + public Set sourceTopics() { + return sourceTopics; + } + + public Set repartitionSinkTopics() { + return repartitionSinkTopics; + } + + public Map stateChangelogTopics() { + return stateChangelogTopics; + } + + public Map repartitionSourceTopics() { + return repartitionSourceTopics; + } + + public Collection> copartitionGroups() { + return copartitionGroups; + } + + @Override + public String toString() { + return "Subtopology{" + + "sourceTopics=" + sourceTopics + + ", repartitionSinkTopics=" + repartitionSinkTopics + + ", stateChangelogTopics=" + stateChangelogTopics + + ", repartitionSourceTopics=" + repartitionSourceTopics + + ", copartitionGroups=" + copartitionGroups + + '}'; + } + } + + public static class TopicInfo { + + private final Optional numPartitions; + private final Optional replicationFactor; + private final Map topicConfigs; + + public TopicInfo(final Optional numPartitions, + final Optional replicationFactor, + final Map topicConfigs) { + this.numPartitions = Objects.requireNonNull(numPartitions, "Number of partitions cannot be null"); + this.replicationFactor = Objects.requireNonNull(replicationFactor, "Replication factor cannot be null"); + this.topicConfigs = + Map.copyOf(Objects.requireNonNull(topicConfigs, "Additional topic configs cannot be null")); + } + + public Optional numPartitions() { + return numPartitions; + } + + public Optional replicationFactor() { + return replicationFactor; + } + + public Map topicConfigs() { + return topicConfigs; + } + + @Override + public String toString() { + return "TopicInfo{" + + "numPartitions=" + numPartitions + + ", replicationFactor=" + replicationFactor + + ", topicConfigs=" + topicConfigs + + '}'; + } + } + + private final Map subtopologies; + + private final AtomicReference reconciledAssignment = new AtomicReference<>(Assignment.EMPTY); + + public StreamsRebalanceData(Map subtopologies) { + this.subtopologies = Map.copyOf(Objects.requireNonNull(subtopologies, "Subtopologies cannot be null")); + } + + public Map subtopologies() { + return subtopologies; + } + + public void setReconciledAssignment(final Assignment assignment) { + reconciledAssignment.set(assignment); + } + + public Assignment reconciledAssignment() { + return reconciledAssignment.get(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceEventsProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceEventsProcessor.java new file mode 100644 index 0000000000000..db91e8fcece8e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceEventsProcessor.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; +import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent; +import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackNeededEvent; +import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent; +import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackNeededEvent; +import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent; +import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackNeededEvent; +import org.apache.kafka.common.KafkaException; + +import java.util.LinkedList; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Processes events from the Streams rebalance protocol. + *

+ * The Streams rebalance processor receives events from the background thread of the async consumer, more precisely + * from the Streams membership manager and handles them. + * For example, events are requests for invoking the task assignment and task revocation callbacks. + * Results of the event handling are passed back to the background thread. + */ +public class StreamsRebalanceEventsProcessor { + + private final BlockingQueue onCallbackRequests = new LinkedBlockingQueue<>(); + private ApplicationEventHandler applicationEventHandler = null; + private final StreamsGroupRebalanceCallbacks rebalanceCallbacks; + private final StreamsRebalanceData streamsRebalanceData; + + /** + * Constructs the Streams rebalance processor. + * + * @param streamsRebalanceData + * @param rebalanceCallbacks + */ + public StreamsRebalanceEventsProcessor(StreamsRebalanceData streamsRebalanceData, + StreamsGroupRebalanceCallbacks rebalanceCallbacks) { + this.streamsRebalanceData = streamsRebalanceData; + this.rebalanceCallbacks = rebalanceCallbacks; + } + + /** + * Requests the invocation of the task assignment callback. + * + * @param assignment The tasks to be assigned to the member of the Streams group. + * @return A future that will be completed when the callback has been invoked. + */ + public CompletableFuture requestOnTasksAssignedCallbackInvocation(final StreamsRebalanceData.Assignment assignment) { + final StreamsOnTasksAssignedCallbackNeededEvent onTasksAssignedCallbackNeededEvent = new StreamsOnTasksAssignedCallbackNeededEvent(assignment); + onCallbackRequests.add(onTasksAssignedCallbackNeededEvent); + return onTasksAssignedCallbackNeededEvent.future(); + } + + /** + * Requests the invocation of the task revocation callback. + * + * @param activeTasksToRevoke The tasks to revoke from the member of the Streams group + * @return A future that will be completed when the callback has been invoked. + */ + public CompletableFuture requestOnTasksRevokedCallbackInvocation(final Set activeTasksToRevoke) { + final StreamsOnTasksRevokedCallbackNeededEvent onTasksRevokedCallbackNeededEvent = new StreamsOnTasksRevokedCallbackNeededEvent(activeTasksToRevoke); + onCallbackRequests.add(onTasksRevokedCallbackNeededEvent); + return onTasksRevokedCallbackNeededEvent.future(); + } + + /** + * Requests the invocation of the all tasks lost callback. + * + * @return A future that will be completed when the callback has been invoked. + */ + public CompletableFuture requestOnAllTasksLostCallbackInvocation() { + final StreamsOnAllTasksLostCallbackNeededEvent onAllTasksLostCallbackNeededEvent = new StreamsOnAllTasksLostCallbackNeededEvent(); + onCallbackRequests.add(onAllTasksLostCallbackNeededEvent); + return onAllTasksLostCallbackNeededEvent.future(); + } + + /** + * Sets the application event handler. + * + * The application handler sends the results of the callbacks to the background thread. + * + * @param applicationEventHandler The application handler. + */ + public void setApplicationEventHandler(final ApplicationEventHandler applicationEventHandler) { + this.applicationEventHandler = applicationEventHandler; + } + + private void process(final BackgroundEvent event) { + switch (event.type()) { + case ERROR: + throw ((ErrorEvent) event).error(); + + case STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED: + processStreamsOnTasksRevokedCallbackNeededEvent((StreamsOnTasksRevokedCallbackNeededEvent) event); + break; + + case STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED: + processStreamsOnTasksAssignedCallbackNeededEvent((StreamsOnTasksAssignedCallbackNeededEvent) event); + break; + + case STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED: + processStreamsOnAllTasksLostCallbackNeededEvent((StreamsOnAllTasksLostCallbackNeededEvent) event); + break; + + default: + throw new IllegalArgumentException("Background event type " + event.type() + " was not expected"); + + } + } + + private void processStreamsOnTasksRevokedCallbackNeededEvent(final StreamsOnTasksRevokedCallbackNeededEvent event) { + StreamsOnTasksRevokedCallbackCompletedEvent invokedEvent = invokeOnTasksRevokedCallback(event.activeTasksToRevoke(), event.future()); + applicationEventHandler.add(invokedEvent); + if (invokedEvent.error().isPresent()) { + throw invokedEvent.error().get(); + } + } + + private void processStreamsOnTasksAssignedCallbackNeededEvent(final StreamsOnTasksAssignedCallbackNeededEvent event) { + StreamsOnTasksAssignedCallbackCompletedEvent invokedEvent = invokeOnTasksAssignedCallback(event.assignment(), event.future()); + applicationEventHandler.add(invokedEvent); + if (invokedEvent.error().isPresent()) { + throw invokedEvent.error().get(); + } + } + + private void processStreamsOnAllTasksLostCallbackNeededEvent(final StreamsOnAllTasksLostCallbackNeededEvent event) { + StreamsOnAllTasksLostCallbackCompletedEvent invokedEvent = invokeOnAllTasksLostCallback(event.future()); + applicationEventHandler.add(invokedEvent); + if (invokedEvent.error().isPresent()) { + throw invokedEvent.error().get(); + } + } + + private StreamsOnTasksRevokedCallbackCompletedEvent invokeOnTasksRevokedCallback(final Set activeTasksToRevoke, + final CompletableFuture future) { + final Optional error; + final Optional exceptionFromCallback = rebalanceCallbacks.onTasksRevoked(activeTasksToRevoke); + if (exceptionFromCallback.isPresent()) { + error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "Task revocation callback throws an error")); + } else { + error = Optional.empty(); + } + return new StreamsOnTasksRevokedCallbackCompletedEvent(future, error); + } + + private StreamsOnTasksAssignedCallbackCompletedEvent invokeOnTasksAssignedCallback(final StreamsRebalanceData.Assignment assignment, + final CompletableFuture future) { + final Optional error; + final Optional exceptionFromCallback = rebalanceCallbacks.onTasksAssigned(assignment); + if (exceptionFromCallback.isPresent()) { + error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "Task assignment callback throws an error")); + } else { + error = Optional.empty(); + streamsRebalanceData.setReconciledAssignment(assignment); + } + return new StreamsOnTasksAssignedCallbackCompletedEvent(future, error); + } + + private StreamsOnAllTasksLostCallbackCompletedEvent invokeOnAllTasksLostCallback(final CompletableFuture future) { + final Optional error; + final Optional exceptionFromCallback = rebalanceCallbacks.onAllTasksLost(); + if (exceptionFromCallback.isPresent()) { + error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "All tasks lost callback throws an error")); + } else { + error = Optional.empty(); + streamsRebalanceData.setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY); + } + return new StreamsOnAllTasksLostCallbackCompletedEvent(future, error); + } + + /** + * Processes all events received from the background thread so far. + */ + public void process() { + LinkedList events = new LinkedList<>(); + onCallbackRequests.drainTo(events); + for (BackgroundEvent event : events) { + process(event); + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index dfb775f8947c1..6f0557772a4bf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -41,6 +41,9 @@ public enum Type { SHARE_ACKNOWLEDGE_ON_CLOSE, SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION, SEEK_UNVALIDATED, + STREAMS_ON_TASKS_ASSIGNED_CALLBACK_COMPLETED, + STREAMS_ON_TASKS_REVOKED_CALLBACK_COMPLETED, + STREAMS_ON_ALL_TASKS_LOST_CALLBACK_COMPLETED, } private final Type type; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java index 02fc4b4a29ba4..b2f8a3666c499 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java @@ -27,7 +27,12 @@ public abstract class BackgroundEvent { public enum Type { - ERROR, CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED, SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK + ERROR, + CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED, + SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK, + STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED, + STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED, + STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED } private final Type type; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAllTasksLostCallbackCompletedEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAllTasksLostCallbackCompletedEvent.java new file mode 100644 index 0000000000000..b84e9d0c1386d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAllTasksLostCallbackCompletedEvent.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.common.KafkaException; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +public class StreamsOnAllTasksLostCallbackCompletedEvent extends ApplicationEvent { + + private final CompletableFuture future; + private final Optional error; + + public StreamsOnAllTasksLostCallbackCompletedEvent(final CompletableFuture future, + final Optional error) { + super(Type.STREAMS_ON_ALL_TASKS_LOST_CALLBACK_COMPLETED); + this.future = Objects.requireNonNull(future); + this.error = Objects.requireNonNull(error); + } + + public CompletableFuture future() { + return future; + } + + public Optional error() { + return error; + } + + @Override + protected String toStringBase() { + return super.toStringBase() + + ", future=" + future + + ", error=" + error; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAllTasksLostCallbackNeededEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAllTasksLostCallbackNeededEvent.java new file mode 100644 index 0000000000000..29e1a94ec7239 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnAllTasksLostCallbackNeededEvent.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class StreamsOnAllTasksLostCallbackNeededEvent extends CompletableBackgroundEvent { + + public StreamsOnAllTasksLostCallbackNeededEvent() { + super(Type.STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED, Long.MAX_VALUE); + } + + @Override + protected String toStringBase() { + return super.toStringBase(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksAssignedCallbackCompletedEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksAssignedCallbackCompletedEvent.java new file mode 100644 index 0000000000000..96c2519bb2d33 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksAssignedCallbackCompletedEvent.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.common.KafkaException; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +public class StreamsOnTasksAssignedCallbackCompletedEvent extends ApplicationEvent { + + private final CompletableFuture future; + private final Optional error; + + public StreamsOnTasksAssignedCallbackCompletedEvent(final CompletableFuture future, + final Optional error) { + super(Type.STREAMS_ON_TASKS_ASSIGNED_CALLBACK_COMPLETED); + this.future = Objects.requireNonNull(future); + this.error = Objects.requireNonNull(error); + } + + public CompletableFuture future() { + return future; + } + + public Optional error() { + return error; + } + + @Override + protected String toStringBase() { + return super.toStringBase() + + ", future=" + future + + ", error=" + error; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksAssignedCallbackNeededEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksAssignedCallbackNeededEvent.java new file mode 100644 index 0000000000000..565bf97c6b775 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksAssignedCallbackNeededEvent.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData; + +import java.util.Objects; + +public class StreamsOnTasksAssignedCallbackNeededEvent extends CompletableBackgroundEvent { + + private final StreamsRebalanceData.Assignment assignment; + + public StreamsOnTasksAssignedCallbackNeededEvent(StreamsRebalanceData.Assignment assignment) { + super(Type.STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED, Long.MAX_VALUE); + this.assignment = Objects.requireNonNull(assignment); + } + + public StreamsRebalanceData.Assignment assignment() { + return assignment; + } + + @Override + protected String toStringBase() { + return super.toStringBase() + + ", assignment=" + assignment; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksRevokedCallbackCompletedEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksRevokedCallbackCompletedEvent.java new file mode 100644 index 0000000000000..5717012ac4576 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksRevokedCallbackCompletedEvent.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.common.KafkaException; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +public class StreamsOnTasksRevokedCallbackCompletedEvent extends ApplicationEvent { + + private final CompletableFuture future; + private final Optional error; + + public StreamsOnTasksRevokedCallbackCompletedEvent(final CompletableFuture future, + final Optional error) { + super(Type.STREAMS_ON_TASKS_REVOKED_CALLBACK_COMPLETED); + this.future = Objects.requireNonNull(future); + this.error = Objects.requireNonNull(error); + } + + public CompletableFuture future() { + return future; + } + + public Optional error() { + return error; + } + + @Override + protected String toStringBase() { + return super.toStringBase() + + ", future=" + future + + ", error=" + error; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksRevokedCallbackNeededEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksRevokedCallbackNeededEvent.java new file mode 100644 index 0000000000000..1e3e58a9f5b79 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksRevokedCallbackNeededEvent.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData; + +import java.util.Objects; +import java.util.Set; + +public class StreamsOnTasksRevokedCallbackNeededEvent extends CompletableBackgroundEvent { + + private final Set activeTasksToRevoke; + + public StreamsOnTasksRevokedCallbackNeededEvent(final Set activeTasksToRevoke) { + super(Type.STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED, Long.MAX_VALUE); + this.activeTasksToRevoke = Objects.requireNonNull(activeTasksToRevoke); + } + + public Set activeTasksToRevoke() { + return activeTasksToRevoke; + } + + @Override + protected String toStringBase() { + return super.toStringBase() + + ", active tasks to revoke=" + activeTasksToRevoke; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java new file mode 100644 index 0000000000000..8a67c580f06d4 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class StreamsRebalanceDataTest { + + @Test + public void testTaskIdEqualsAndHashCode() { + final StreamsRebalanceData.TaskId task = new StreamsRebalanceData.TaskId("subtopologyId1", 1); + final StreamsRebalanceData.TaskId taskEqual = new StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId()); + final StreamsRebalanceData.TaskId taskUnequalSubtopology = new StreamsRebalanceData.TaskId(task.subtopologyId() + "1", task.partitionId()); + final StreamsRebalanceData.TaskId taskUnequalPartition = new StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId() + 1); + + assertEquals(task, taskEqual); + assertEquals(task.hashCode(), taskEqual.hashCode()); + assertNotEquals(task, taskUnequalSubtopology); + assertNotEquals(task.hashCode(), taskUnequalSubtopology.hashCode()); + assertNotEquals(task, taskUnequalPartition); + assertNotEquals(task.hashCode(), taskUnequalSubtopology.hashCode()); + } + + @Test + public void taskIdShouldNotAcceptNulls() { + final Exception exception = assertThrows(NullPointerException.class, () -> new StreamsRebalanceData.TaskId(null, 1)); + assertEquals("Subtopology ID cannot be null", exception.getMessage()); + } + + @Test + public void testTaskIdCompareTo() { + final StreamsRebalanceData.TaskId task = new StreamsRebalanceData.TaskId("subtopologyId1", 1); + + assertTrue(task.compareTo(new StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId())) == 0); + assertTrue(task.compareTo(new StreamsRebalanceData.TaskId(task.subtopologyId() + "1", task.partitionId())) < 0); + assertTrue(task.compareTo(new StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId() + 1)) < 0); + assertTrue(new StreamsRebalanceData.TaskId(task.subtopologyId() + "1", task.partitionId()).compareTo(task) > 0); + assertTrue(new StreamsRebalanceData.TaskId(task.subtopologyId(), task.partitionId() + 1).compareTo(task) > 0); + } + + @Test + public void emptyAssignmentShouldNotBeModifiable() { + final StreamsRebalanceData.Assignment emptyAssignment = StreamsRebalanceData.Assignment.EMPTY; + + assertThrows( + UnsupportedOperationException.class, + () -> emptyAssignment.activeTasks().add(new StreamsRebalanceData.TaskId("subtopologyId1", 1)) + ); + assertThrows( + UnsupportedOperationException.class, + () -> emptyAssignment.standbyTasks().add(new StreamsRebalanceData.TaskId("subtopologyId1", 1)) + ); + assertThrows( + UnsupportedOperationException.class, + () -> emptyAssignment.warmupTasks().add(new StreamsRebalanceData.TaskId("subtopologyId1", 1)) + ); + } + + @Test + public void assignmentShouldNotBeModifiable() { + final StreamsRebalanceData.Assignment assignment = new StreamsRebalanceData.Assignment( + new HashSet<>(Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 1))), + new HashSet<>(Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 2))), + new HashSet<>(Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3))) + ); + + assertThrows( + UnsupportedOperationException.class, + () -> assignment.activeTasks().add(new StreamsRebalanceData.TaskId("subtopologyId2", 1)) + ); + assertThrows( + UnsupportedOperationException.class, + () -> assignment.standbyTasks().add(new StreamsRebalanceData.TaskId("subtopologyId2", 2)) + ); + assertThrows( + UnsupportedOperationException.class, + () -> assignment.warmupTasks().add(new StreamsRebalanceData.TaskId("subtopologyId2", 3)) + ); + } + + @Test + public void assignmentShouldNotAcceptNulls() { + final Exception exception1 = assertThrows(NullPointerException.class, () -> new StreamsRebalanceData.Assignment(null, Set.of(), Set.of())); + assertEquals("Active tasks cannot be null", exception1.getMessage()); + final Exception exception2 = assertThrows(NullPointerException.class, () -> new StreamsRebalanceData.Assignment(Set.of(), null, Set.of())); + assertEquals("Standby tasks cannot be null", exception2.getMessage()); + final Exception exception3 = assertThrows(NullPointerException.class, () -> new StreamsRebalanceData.Assignment(Set.of(), Set.of(), null)); + assertEquals("Warmup tasks cannot be null", exception3.getMessage()); + } + + @Test + public void testAssignmentEqualsAndHashCode() { + final StreamsRebalanceData.TaskId additionalTask = new StreamsRebalanceData.TaskId("subtopologyId2", 1); + final StreamsRebalanceData.Assignment assignment = new StreamsRebalanceData.Assignment( + Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 1)), + Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 2)), + Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)) + ); + final StreamsRebalanceData.Assignment assignmentEqual = new StreamsRebalanceData.Assignment( + assignment.activeTasks(), + assignment.standbyTasks(), + assignment.warmupTasks() + ); + Set unequalActiveTasks = new HashSet<>(assignment.activeTasks()); + unequalActiveTasks.add(additionalTask); + final StreamsRebalanceData.Assignment assignmentUnequalActiveTasks = new StreamsRebalanceData.Assignment( + unequalActiveTasks, + assignment.standbyTasks(), + assignment.warmupTasks() + ); + Set unequalStandbyTasks = new HashSet<>(assignment.standbyTasks()); + unequalStandbyTasks.add(additionalTask); + final StreamsRebalanceData.Assignment assignmentUnequalStandbyTasks = new StreamsRebalanceData.Assignment( + assignment.activeTasks(), + unequalStandbyTasks, + assignment.warmupTasks() + ); + Set unequalWarmupTasks = new HashSet<>(assignment.warmupTasks()); + unequalWarmupTasks.add(additionalTask); + final StreamsRebalanceData.Assignment assignmentUnequalWarmupTasks = new StreamsRebalanceData.Assignment( + assignment.activeTasks(), + assignment.standbyTasks(), + unequalWarmupTasks + ); + + assertEquals(assignment, assignmentEqual); + assertNotEquals(assignment, assignmentUnequalActiveTasks); + assertNotEquals(assignment, assignmentUnequalStandbyTasks); + assertNotEquals(assignment, assignmentUnequalWarmupTasks); + assertEquals(assignment.hashCode(), assignmentEqual.hashCode()); + assertNotEquals(assignment.hashCode(), assignmentUnequalActiveTasks.hashCode()); + assertNotEquals(assignment.hashCode(), assignmentUnequalStandbyTasks.hashCode()); + assertNotEquals(assignment.hashCode(), assignmentUnequalWarmupTasks.hashCode()); + } + + @Test + public void shouldCopyAssignment() { + final StreamsRebalanceData.Assignment assignment = new StreamsRebalanceData.Assignment( + Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 1)), + Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 2)), + Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)) + ); + + final StreamsRebalanceData.Assignment copy = assignment.copy(); + + assertEquals(assignment, copy); + assertNotSame(assignment, copy); + } + + @Test + public void shouldCopyEmptyAssignment() { + final StreamsRebalanceData.Assignment emptyAssignment = StreamsRebalanceData.Assignment.EMPTY; + + final StreamsRebalanceData.Assignment copy = emptyAssignment.copy(); + + assertEquals(emptyAssignment, copy); + assertNotSame(emptyAssignment, copy); + } + + @Test + public void subtopologyShouldNotAcceptNulls() { + final Exception exception1 = assertThrows( + NullPointerException.class, + () -> new StreamsRebalanceData.Subtopology(null, Set.of(), Map.of(), Map.of(), List.of()) + ); + assertEquals("Subtopology ID cannot be null", exception1.getMessage()); + final Exception exception2 = assertThrows( + NullPointerException.class, + () -> new StreamsRebalanceData.Subtopology(Set.of(), null, Map.of(), Map.of(), List.of()) + ); + assertEquals("Repartition sink topics cannot be null", exception2.getMessage()); + final Exception exception3 = assertThrows( + NullPointerException.class, + () -> new StreamsRebalanceData.Subtopology(Set.of(), Set.of(), null, Map.of(), List.of()) + ); + assertEquals("Repartition source topics cannot be null", exception3.getMessage()); + final Exception exception4 = assertThrows( + NullPointerException.class, + () -> new StreamsRebalanceData.Subtopology(Set.of(), Set.of(), Map.of(), null, List.of()) + ); + assertEquals("State changelog topics cannot be null", exception4.getMessage()); + final Exception exception5 = assertThrows( + NullPointerException.class, + () -> new StreamsRebalanceData.Subtopology(Set.of(), Set.of(), Map.of(), Map.of(), null) + ); + assertEquals("Co-partition groups cannot be null", exception5.getMessage()); + } + + @Test + public void subtopologyShouldNotBeModifiable() { + final StreamsRebalanceData.Subtopology subtopology = new StreamsRebalanceData.Subtopology( + new HashSet<>(Set.of("sourceTopic1")), + new HashSet<>(Set.of("repartitionSinkTopic1")), + Map.of("repartitionSourceTopic1", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.of((short) 1), Map.of())) + .entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), + Map.of("stateChangelogTopic1", new StreamsRebalanceData.TopicInfo(Optional.of(0), Optional.of((short) 1), Map.of())) + .entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), + new ArrayList<>(List.of(Set.of("sourceTopic1"))) + ); + + assertThrows( + UnsupportedOperationException.class, + () -> subtopology.sourceTopics().add("sourceTopic2") + ); + assertThrows( + UnsupportedOperationException.class, + () -> subtopology.repartitionSinkTopics().add("repartitionSinkTopic2") + ); + assertThrows( + UnsupportedOperationException.class, + () -> subtopology.repartitionSourceTopics().put("repartitionSourceTopic2", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.of((short) 1), Map.of())) + ); + assertThrows( + UnsupportedOperationException.class, + () -> subtopology.stateChangelogTopics().put("stateChangelogTopic2", new StreamsRebalanceData.TopicInfo(Optional.of(0), Optional.of((short) 1), Map.of())) + ); + assertThrows( + UnsupportedOperationException.class, + () -> subtopology.copartitionGroups().add(Set.of("sourceTopic2")) + ); + } + + @Test + public void topicInfoShouldNotAcceptNulls() { + final Exception exception1 = assertThrows( + NullPointerException.class, + () -> new StreamsRebalanceData.TopicInfo(null, Optional.of((short) 1), Map.of()) + ); + assertEquals("Number of partitions cannot be null", exception1.getMessage()); + final Exception exception2 = assertThrows( + NullPointerException.class, + () -> new StreamsRebalanceData.TopicInfo(Optional.of(1), null, Map.of()) + ); + assertEquals("Replication factor cannot be null", exception2.getMessage()); + final Exception exception3 = assertThrows( + NullPointerException.class, + () -> new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.of((short) 1), null) + ); + assertEquals("Additional topic configs cannot be null", exception3.getMessage()); + } + + @Test + public void streamsRebalanceDataShouldNotHaveModifiableSubtopologies() { + final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(new HashMap<>()); + + assertThrows( + UnsupportedOperationException.class, + () -> streamsRebalanceData.subtopologies().put("subtopologyId2", new StreamsRebalanceData.Subtopology( + Set.of(), + Set.of(), + Map.of(), + Map.of(), + List.of() + )) + ); + } + + @Test + public void streamsRebalanceDataShouldNotAcceptNulls() { + final Exception exception = assertThrows( + NullPointerException.class, + () -> new StreamsRebalanceData(null) + ); + assertEquals("Subtopologies cannot be null", exception.getMessage()); + } + + @Test + public void streamsRebalanceDataShouldBeConstructedWithEmptyAssignment() { + final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(new HashMap<>()); + + assertEquals(StreamsRebalanceData.Assignment.EMPTY, streamsRebalanceData.reconciledAssignment()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceEventsProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceEventsProcessorTest.java new file mode 100644 index 0000000000000..f30aa2718b177 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceEventsProcessorTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; +import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent; +import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent; +import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent; +import org.apache.kafka.common.KafkaException; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class StreamsRebalanceEventsProcessorTest { + + private static final String SUBTOPOLOGY_0 = "subtopology-0"; + private static final String SUBTOPOLOGY_1 = "subtopology-1"; + + @Mock + private StreamsGroupRebalanceCallbacks rebalanceCallbacks; + + @Mock + private ApplicationEventHandler applicationEventHandler; + + @Test + public void shouldInvokeOnTasksAssignedCallback() { + final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap()); + final StreamsRebalanceEventsProcessor rebalanceEventsProcessor = + new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks); + rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler); + final Set activeTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 1) + ); + final Set standbyTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 1), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 0) + ); + final Set warmupTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 2), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 3) + ); + StreamsRebalanceData.Assignment assignment = + new StreamsRebalanceData.Assignment(activeTasks, standbyTasks, warmupTasks); + when(rebalanceCallbacks.onTasksAssigned(assignment)).thenReturn(Optional.empty()); + + final CompletableFuture onTasksAssignedExecuted = rebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(assignment); + + assertFalse(onTasksAssignedExecuted.isDone()); + rebalanceEventsProcessor.process(); + ArgumentCaptor streamsOnTasksAssignedCallbackCompletedCaptor = + ArgumentCaptor.forClass(StreamsOnTasksAssignedCallbackCompletedEvent.class); + verify(applicationEventHandler).add(streamsOnTasksAssignedCallbackCompletedCaptor.capture()); + StreamsOnTasksAssignedCallbackCompletedEvent streamsOnTasksAssignedCallbackCompletedEvent = + streamsOnTasksAssignedCallbackCompletedCaptor.getValue(); + assertFalse(streamsOnTasksAssignedCallbackCompletedEvent.future().isDone()); + assertTrue(streamsOnTasksAssignedCallbackCompletedEvent.error().isEmpty()); + assertEquals(assignment, rebalanceData.reconciledAssignment()); + } + + @Test + public void shouldReThrowErrorFromOnTasksAssignedCallbackAndPassErrorToBackground() { + final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap()); + final StreamsRebalanceEventsProcessor rebalanceEventsProcessor = + new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks); + rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler); + final Set activeTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 1) + ); + final Set standbyTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 1), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 0) + ); + final Set warmupTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 2), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 3) + ); + StreamsRebalanceData.Assignment assignment = + new StreamsRebalanceData.Assignment(activeTasks, standbyTasks, warmupTasks); + final Exception exception = new RuntimeException("Nobody expects the Spanish inquisition."); + when(rebalanceCallbacks.onTasksAssigned(assignment)).thenReturn(Optional.of(exception)); + + final CompletableFuture onTasksAssignedExecuted = rebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(assignment); + + assertFalse(onTasksAssignedExecuted.isDone()); + final Exception actualException = assertThrows(KafkaException.class, rebalanceEventsProcessor::process); + assertEquals("Task assignment callback throws an error", actualException.getMessage()); + ArgumentCaptor streamsOnTasksAssignedCallbackCompletedCaptor = + ArgumentCaptor.forClass(StreamsOnTasksAssignedCallbackCompletedEvent.class); + verify(applicationEventHandler).add(streamsOnTasksAssignedCallbackCompletedCaptor.capture()); + StreamsOnTasksAssignedCallbackCompletedEvent streamsOnTasksAssignedCallbackCompletedEvent = + streamsOnTasksAssignedCallbackCompletedCaptor.getValue(); + assertFalse(streamsOnTasksAssignedCallbackCompletedEvent.future().isDone()); + assertTrue(streamsOnTasksAssignedCallbackCompletedEvent.error().isPresent()); + assertEquals(exception, streamsOnTasksAssignedCallbackCompletedEvent.error().get().getCause()); + assertEquals(exception, actualException.getCause()); + assertEquals(StreamsRebalanceData.Assignment.EMPTY, rebalanceData.reconciledAssignment()); + } + + @Test + public void shouldInvokeOnTasksRevokedCallback() { + final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap()); + final StreamsRebalanceEventsProcessor rebalanceEventsProcessor = + new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks); + rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler); + final Set activeTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 1) + ); + when(rebalanceCallbacks.onTasksRevoked(activeTasks)).thenReturn(Optional.empty()); + + final CompletableFuture onTasksRevokedExecuted = rebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasks); + + assertFalse(onTasksRevokedExecuted.isDone()); + rebalanceEventsProcessor.process(); + ArgumentCaptor streamsOnTasksRevokedCallbackCompletedCaptor = + ArgumentCaptor.forClass(StreamsOnTasksRevokedCallbackCompletedEvent.class); + verify(applicationEventHandler).add(streamsOnTasksRevokedCallbackCompletedCaptor.capture()); + StreamsOnTasksRevokedCallbackCompletedEvent streamsOnTasksRevokedCallbackCompletedEvent = + streamsOnTasksRevokedCallbackCompletedCaptor.getValue(); + assertFalse(streamsOnTasksRevokedCallbackCompletedEvent.future().isDone()); + assertTrue(streamsOnTasksRevokedCallbackCompletedEvent.error().isEmpty()); + } + + @Test + public void shouldReThrowErrorFromOnTasksRevokedCallbackAndPassErrorToBackground() { + final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap()); + final StreamsRebalanceEventsProcessor rebalanceEventsProcessor = + new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks); + rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler); + final Set activeTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 1) + ); + final Exception exception = new RuntimeException("Nobody expects the Spanish inquisition."); + when(rebalanceCallbacks.onTasksRevoked(activeTasks)).thenReturn(Optional.of(exception)); + + final CompletableFuture onTasksRevokedExecuted = rebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasks); + + assertFalse(onTasksRevokedExecuted.isDone()); + final Exception actualException = assertThrows(KafkaException.class, rebalanceEventsProcessor::process); + assertEquals("Task revocation callback throws an error", actualException.getMessage()); + ArgumentCaptor streamsOnTasksRevokedCallbackCompletedCaptor = + ArgumentCaptor.forClass(StreamsOnTasksRevokedCallbackCompletedEvent.class); + verify(applicationEventHandler).add(streamsOnTasksRevokedCallbackCompletedCaptor.capture()); + StreamsOnTasksRevokedCallbackCompletedEvent streamsOnTasksRevokedCallbackCompletedEvent = + streamsOnTasksRevokedCallbackCompletedCaptor.getValue(); + assertTrue(streamsOnTasksRevokedCallbackCompletedEvent.error().isPresent()); + assertEquals(exception, streamsOnTasksRevokedCallbackCompletedEvent.error().get().getCause()); + assertEquals(exception, actualException.getCause()); + } + + @Test + public void shouldInvokeOnAllTasksLostCallback() { + final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap()); + final StreamsRebalanceEventsProcessor rebalanceEventsProcessor = + new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks); + rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler); + final Set activeTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 1) + ); + final Set standbyTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 1), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 0) + ); + final Set warmupTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 2), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 3) + ); + StreamsRebalanceData.Assignment assignment = + new StreamsRebalanceData.Assignment(activeTasks, standbyTasks, warmupTasks); + when(rebalanceCallbacks.onTasksAssigned(assignment)).thenReturn(Optional.empty()); + rebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(assignment); + rebalanceEventsProcessor.process(); + assertEquals(assignment, rebalanceData.reconciledAssignment()); + when(rebalanceCallbacks.onAllTasksLost()).thenReturn(Optional.empty()); + + final CompletableFuture onAllTasksLostExecuted = rebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation(); + + assertFalse(onAllTasksLostExecuted.isDone()); + rebalanceEventsProcessor.process(); + ArgumentCaptor streamsOnAllTasksLostCallbackCompletedCaptor = + ArgumentCaptor.forClass(StreamsOnAllTasksLostCallbackCompletedEvent.class); + verify(applicationEventHandler).add(streamsOnAllTasksLostCallbackCompletedCaptor.capture()); + StreamsOnAllTasksLostCallbackCompletedEvent streamsOnAllTasksLostCallbackCompletedEvent = + streamsOnAllTasksLostCallbackCompletedCaptor.getValue(); + assertFalse(streamsOnAllTasksLostCallbackCompletedEvent.future().isDone()); + assertTrue(streamsOnAllTasksLostCallbackCompletedEvent.error().isEmpty()); + assertEquals(StreamsRebalanceData.Assignment.EMPTY, rebalanceData.reconciledAssignment()); + } + + @Test + public void shouldReThrowErrorFromOnAllTasksLostCallbackAndPassErrorToBackground() { + final StreamsRebalanceData rebalanceData = new StreamsRebalanceData(Collections.emptyMap()); + final StreamsRebalanceEventsProcessor rebalanceEventsProcessor = + new StreamsRebalanceEventsProcessor(rebalanceData, rebalanceCallbacks); + rebalanceEventsProcessor.setApplicationEventHandler(applicationEventHandler); + final Set activeTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 0), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 1) + ); + final Set standbyTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 1), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 0) + ); + final Set warmupTasks = Set.of( + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_1, 2), + new StreamsRebalanceData.TaskId(SUBTOPOLOGY_0, 3) + ); + StreamsRebalanceData.Assignment assignment = + new StreamsRebalanceData.Assignment(activeTasks, standbyTasks, warmupTasks); + when(rebalanceCallbacks.onTasksAssigned(assignment)).thenReturn(Optional.empty()); + rebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(assignment); + rebalanceEventsProcessor.process(); + assertEquals(assignment, rebalanceData.reconciledAssignment()); + final Exception exception = new RuntimeException("Nobody expects the Spanish inquisition."); + when(rebalanceCallbacks.onAllTasksLost()).thenReturn(Optional.of(exception)); + + final CompletableFuture onAllTasksLostExecuted = rebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation(); + + assertFalse(onAllTasksLostExecuted.isDone()); + final Exception actualException = assertThrows(KafkaException.class, rebalanceEventsProcessor::process); + assertEquals("All tasks lost callback throws an error", actualException.getMessage()); + ArgumentCaptor streamsOnAllTasksLostCallbackCompletedCaptor = + ArgumentCaptor.forClass(StreamsOnAllTasksLostCallbackCompletedEvent.class); + verify(applicationEventHandler).add(streamsOnAllTasksLostCallbackCompletedCaptor.capture()); + StreamsOnAllTasksLostCallbackCompletedEvent streamsOnAllTasksLostCallbackCompletedEvent = + streamsOnAllTasksLostCallbackCompletedCaptor.getValue(); + assertFalse(streamsOnAllTasksLostCallbackCompletedEvent.future().isDone()); + assertTrue(streamsOnAllTasksLostCallbackCompletedEvent.error().isPresent()); + assertEquals(exception, streamsOnAllTasksLostCallbackCompletedEvent.error().get().getCause()); + assertEquals(exception, actualException.getCause()); + assertEquals(assignment, rebalanceData.reconciledAssignment()); + } +} \ No newline at end of file