diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/TopologyTaskInfo.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/TopologyTaskInfo.java new file mode 100644 index 000000000..c0ed37e7d --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/TopologyTaskInfo.java @@ -0,0 +1,114 @@ +package dev.responsive.kafka.internal.utils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.TopologyDescription; +import org.apache.kafka.streams.processor.TaskId; + +public class TopologyTaskInfo { + private final Map tasksByPartition; + private final Map> partitionsByTask; + // todo: add info about internal topics + + @VisibleForTesting + TopologyTaskInfo( + final Map tasksByPartition, + final Map> partitionsByTask + ) { + this.tasksByPartition = Map.copyOf(tasksByPartition); + this.partitionsByTask = Map.copyOf(partitionsByTask); + } + + public Map tasksByPartition() { + return tasksByPartition; + } + + public Map> partitionsByTask() { + return partitionsByTask; + } + + public static TopologyTaskInfo forTopology( + final TopologyDescription topology, + final Admin admin + ) { + final Map tasksByPartition = new HashMap<>(); + final Map> partitionsByTask = new HashMap<>(); + final Set sinkTopics = sinkTopics(topology); + for (final var st : topology.subtopologies()) { + final Set topics = new HashSet<>(); + for (final TopologyDescription.Node node : st.nodes()) { + if (node instanceof TopologyDescription.Source) { + topics.addAll(((TopologyDescription.Source) node).topicSet()); + if (((TopologyDescription.Source) node).topicPattern() != null) { + throw new TopologyTaskInfoException( + "topic patterns are not supported for snapshots"); + } + } + } + if (!Sets.intersection(topics, sinkTopics).isEmpty()) { + throw new TopologyTaskInfoException( + "internal topics are not supported for snapshots" + ); + } + final Map descriptions; + try { + descriptions = admin.describeTopics(topics).allTopicNames().get(); + } catch (final ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + final Set partitionCounts = descriptions.values().stream() + .map(d -> d.partitions().size()) + .collect(Collectors.toSet()); + if (partitionCounts.size() != 1) { + throw new TopologyTaskInfoException( + "unexpected topics with different partition counts"); + } + final int nPartitions = partitionCounts.iterator().next(); + for (int i = 0; i < nPartitions; i++) { + final var taskId = new TaskId(st.id(), i); + partitionsByTask.put(taskId, new ArrayList<>(nPartitions)); + for (final var topic : topics) { + final var tp = new TopicPartition(topic, i); + tasksByPartition.put(tp, taskId); + partitionsByTask.get(taskId).add(tp); + } + } + } + return new TopologyTaskInfo(tasksByPartition, partitionsByTask); + } + + private static Set sinkTopics(TopologyDescription topology) { + final Set sinkTopics = new HashSet<>(); + for (final var st : topology.subtopologies()) { + for (final TopologyDescription.Node node : st.nodes()) { + if (node instanceof TopologyDescription.Sink) { + final String sinkTopic = ((TopologyDescription.Sink) node).topic(); + if (sinkTopic == null) { + throw new TopologyTaskInfoException("non-explicit sink topics not yet supported"); + } + sinkTopics.add(sinkTopic); + } + } + } + return sinkTopics; + } + + public static class TopologyTaskInfoException extends RuntimeException { + private static final long serialVersionUID = 0L; + + public TopologyTaskInfoException(final String message) { + super(message); + } + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoTest.java new file mode 100644 index 000000000..17ebdd70b --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoTest.java @@ -0,0 +1,120 @@ +package dev.responsive.kafka.internal.utils; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyDescription; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class TopologyTaskInfoTest { + @Mock + private Admin admin; + + @SuppressWarnings("unchecked") + @BeforeEach + public void setup() { + when(admin.describeTopics(any(Collection.class))).thenAnswer( + i -> { + final var topics = i.>getArgument(0); + final Map> futures = topics.stream() + .collect(Collectors.toMap( + t -> t, + t -> KafkaFuture.completedFuture(new TopicDescription( + t, + false, + List.of( + new TopicPartitionInfo(0, null, List.of(), List.of()), + new TopicPartitionInfo(1, null, List.of(), List.of()) + ) + )) + )); + return new TestDescribeTopicsResult(futures); + } + ); + } + + @Test + public void shouldComputeTaskAndPartitionMappings() { + // given: + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("source", Consumed.with(Serdes.Long(), Serdes.Long())) + .groupByKey(Grouped.as("groupBy")) + .count(Named.as("count")) + .toStream(Named.as("toStream")) + .to("sink"); + final TopologyDescription description = builder.build().describe(); + + // when: + final var tti = TopologyTaskInfo.forTopology(description, admin); + + // then: + assertThat( + tti.partitionsByTask(), + is( + Map.of( + new TaskId(0, 0), List.of(new TopicPartition("source", 0)), + new TaskId(0, 1), List.of(new TopicPartition("source", 1)) + ) + ) + ); + assertThat( + tti.tasksByPartition(), + is( + Map.of( + new TopicPartition("source", 0), new TaskId(0, 0), + new TopicPartition("source", 1), new TaskId(0, 1) + ) + ) + ); + } + + @Test + public void shouldThrowIfRepartition() { + // given: + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("source", Consumed.with(Serdes.Long(), Serdes.Long())) + .groupBy((k, v) -> v, Grouped.as("groupBy")) + .count(Named.as("count")) + .toStream(Named.as("toStream")) + .to("sink"); + final TopologyDescription description = builder.build().describe(); + + // when/then: + assertThrows( + TopologyTaskInfo.TopologyTaskInfoException.class, + () -> TopologyTaskInfo.forTopology(description, admin) + ); + } + + private static class TestDescribeTopicsResult extends DescribeTopicsResult { + protected TestDescribeTopicsResult( + final Map> nameFutures + ) { + super(null, nameFutures); + } + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoUtils.java new file mode 100644 index 000000000..428e01a5a --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoUtils.java @@ -0,0 +1,18 @@ +package dev.responsive.kafka.internal.utils; + +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; + +public final class TopologyTaskInfoUtils { + public static TopologyTaskInfo createWith( + final Map tasksByPartition, + final Map> partitionsByTask + ) { + return new TopologyTaskInfo(tasksByPartition, partitionsByTask); + } + + private TopologyTaskInfoUtils() { + } +}