diff --git a/etc/checkstyle/checkstyle.xml b/etc/checkstyle/checkstyle.xml index 8a34fe7a1ba..34521b489d0 100644 --- a/etc/checkstyle/checkstyle.xml +++ b/etc/checkstyle/checkstyle.xml @@ -240,6 +240,7 @@ + diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index 04efdae3c37..16332e53e13 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -5,11 +5,15 @@ import java.util.Map; import java.util.Properties; import lombok.Data; +import lombok.Getter; +import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties("kafka") +@EnableConfigurationProperties @Data public class ClustersProperties { @@ -36,6 +40,7 @@ public static class Cluster { Properties properties; boolean readOnly = false; boolean disableLogDirsCollection = false; + List streamApplications = new ArrayList<>(); } @Data @@ -49,4 +54,11 @@ public static class SchemaRegistryAuth { String username; String password; } + + @Getter + @Setter + public static class StreamApplication { + private String applicationId; + private String topologyUrl; + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/StreamTopologyController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/StreamTopologyController.java new file mode 100644 index 00000000000..070aa41110c --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/StreamTopologyController.java @@ -0,0 +1,31 @@ +package com.provectus.kafka.ui.controller; + +import com.provectus.kafka.ui.api.StreamTopologiesApi; +import com.provectus.kafka.ui.model.ProcessorTopologyDTO; +import com.provectus.kafka.ui.model.StreamApplicationsDTO; +import com.provectus.kafka.ui.service.StreamTopologyService; +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Mono; + +@RestController +@RequiredArgsConstructor +public class StreamTopologyController implements StreamTopologiesApi { + private final StreamTopologyService topologyService; + + @Override + public Mono> getStreamApplications( + String clusterName, ServerWebExchange exchange) { + return Mono.just(ResponseEntity.ok(topologyService.getTopologyApplications(clusterName))); + } + + @Override + public Mono> getStreamTopology(String clusterName, + String applicationId, + ServerWebExchange exchange) { + return topologyService.getStreamTopology(clusterName, applicationId) + .map(ResponseEntity::ok); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java index 87147389048..33e8aa2eeb8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java @@ -23,7 +23,8 @@ public enum ErrorCode { KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND), DIR_NOT_FOUND(4012, HttpStatus.BAD_REQUEST), TOPIC_OR_PARTITION_NOT_FOUND(4013, HttpStatus.BAD_REQUEST), - INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST); + INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST), + INVALID_STREAM_TOPOLOGY_STRING(4015, HttpStatus.UNPROCESSABLE_ENTITY); static { // codes uniqueness check diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/InvalidStreamTopologyString.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/InvalidStreamTopologyString.java new file mode 100644 index 00000000000..68fe615b05f --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/InvalidStreamTopologyString.java @@ -0,0 +1,11 @@ +package com.provectus.kafka.ui.exception; + +public class InvalidStreamTopologyString extends RuntimeException { + public InvalidStreamTopologyString() { + super("Invalid stream topology string"); + } + + public InvalidStreamTopologyString(String message) { + super(message); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/StreamTopologyParsingException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/StreamTopologyParsingException.java new file mode 100644 index 00000000000..a7e36c01013 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/StreamTopologyParsingException.java @@ -0,0 +1,20 @@ +package com.provectus.kafka.ui.exception; + +public class StreamTopologyParsingException extends CustomBaseException { + public StreamTopologyParsingException() { + super("Stream topology string is invalid"); + } + + public StreamTopologyParsingException(String message) { + super(message); + } + + public StreamTopologyParsingException(String message, Throwable cause) { + super(message, cause); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.INVALID_STREAM_TOPOLOGY_STRING; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StreamTopologyService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StreamTopologyService.java new file mode 100644 index 00000000000..b4003976ffc --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StreamTopologyService.java @@ -0,0 +1,90 @@ +package com.provectus.kafka.ui.service; + +import static com.provectus.kafka.ui.config.ClustersProperties.StreamApplication; + +import com.provectus.kafka.ui.config.ClustersProperties; +import com.provectus.kafka.ui.exception.ClusterNotFoundException; +import com.provectus.kafka.ui.exception.InvalidStreamTopologyString; +import com.provectus.kafka.ui.exception.NotFoundException; +import com.provectus.kafka.ui.exception.StreamTopologyParsingException; +import com.provectus.kafka.ui.model.ProcessorTopologyDTO; +import com.provectus.kafka.ui.model.StreamApplicationsDTO; +import com.provectus.kafka.ui.service.topology.parser.StreamTopologyParser; +import java.util.ArrayList; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.extern.log4j.Log4j2; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +@Log4j2 +@Service +public class StreamTopologyService { + private final Map> clusterStreamApps; + private final WebClient webClient; + private final StreamTopologyParser topologyParser; + + public StreamTopologyService(ClustersProperties clustersProperties, WebClient webClient, + StreamTopologyParser topologyParser) { + this.clusterStreamApps = getClusterToStreamAppsMap(clustersProperties); + this.webClient = webClient; + this.topologyParser = topologyParser; + } + + public StreamApplicationsDTO getTopologyApplications(String clusterName) { + final var streamApplications = new StreamApplicationsDTO(); + final var applicationIds = Optional.ofNullable(clusterStreamApps.get(clusterName)) + .map(Map::keySet) + .map(ArrayList::new) + .orElseThrow(ClusterNotFoundException::new); + return streamApplications.applicationIds(applicationIds); + } + + public Mono getStreamTopology(String clusterName, String applicationId) { + return Optional.ofNullable(clusterStreamApps.get(clusterName)) + .map(apps -> apps.get(applicationId)) + .map(this::getTopologyString) + .map(topologyMono -> + topologyMono.map(s -> parseTopologyString(s, clusterName, applicationId)) + ) + .orElseThrow(() -> new NotFoundException("Stream application not found")); + } + + private ProcessorTopologyDTO parseTopologyString(String topologyString, String clusterName, + String applicationId) { + try { + return topologyParser.parse(topologyString); + } catch (InvalidStreamTopologyString e) { + log.error("cannot parse stream topology", e); + throw new StreamTopologyParsingException(String + .format("cannot parse stream topology , ", + clusterName, applicationId)); + } + } + + private Mono getTopologyString(String topologyUrl) { + return webClient.get() + .uri(topologyUrl) + .retrieve() + .bodyToMono(String.class) + .doOnError(log::error); + } + + private Map> getClusterToStreamAppsMap( + ClustersProperties clustersProperties) { + return clustersProperties.getClusters().stream() + .map(cluster -> ImmutablePair.of(cluster.getName(), getAppToEndpointMap(cluster))) + .collect(Collectors.toMap(ImmutablePair::getKey, ImmutablePair::getValue)); + } + + private Map getAppToEndpointMap(ClustersProperties.Cluster cluster) { + return cluster.getStreamApplications().stream() + .collect(Collectors.toMap( + StreamApplication::getApplicationId, + StreamApplication::getTopologyUrl) + ); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/topology/parser/StreamTopologyParser.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/topology/parser/StreamTopologyParser.java new file mode 100644 index 00000000000..95d9db535b2 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/topology/parser/StreamTopologyParser.java @@ -0,0 +1,233 @@ +package com.provectus.kafka.ui.service.topology.parser; + +import static com.provectus.kafka.ui.service.topology.parser.StreamTopologyParser.TopologyLiterals.NEXT; +import static com.provectus.kafka.ui.service.topology.parser.StreamTopologyParser.TopologyLiterals.PROCESSOR; +import static com.provectus.kafka.ui.service.topology.parser.StreamTopologyParser.TopologyLiterals.SINK; +import static com.provectus.kafka.ui.service.topology.parser.StreamTopologyParser.TopologyLiterals.SOURCE; +import static com.provectus.kafka.ui.service.topology.parser.StreamTopologyParser.TopologyLiterals.SUB_TOPOLOGY; +import static com.provectus.kafka.ui.service.topology.parser.StreamTopologyParser.TopologyLiterals.TOPIC; + +import com.provectus.kafka.ui.exception.InvalidStreamTopologyString; +import com.provectus.kafka.ui.model.GraphNodeDTO; +import com.provectus.kafka.ui.model.GraphNodeTypeDTO; +import com.provectus.kafka.ui.model.ProcessorNodeDTO; +import com.provectus.kafka.ui.model.ProcessorTopologyDTO; +import com.provectus.kafka.ui.model.SinkProcessorNodeDTO; +import com.provectus.kafka.ui.model.SourceProcessorNodeDTO; +import com.provectus.kafka.ui.model.SubTopologyNodeDTO; +import com.provectus.kafka.ui.model.TopicNodeDTO; +import com.provectus.kafka.ui.model.TopologyGraphDTO; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +@Component +@RequiredArgsConstructor +@Log4j2 +public class StreamTopologyParser { + private final StreamTopologyParserHelper parserHelper; + + public ProcessorTopologyDTO parse(final String topologyString) { + if (StringUtils.isEmpty(topologyString)) { + throw new InvalidStreamTopologyString("topology string is empty"); + } + + final var topologyLines = topologyString.lines() + .skip(1) + .map(String::strip) + .collect(Collectors.toList()); + + if (topologyLines.isEmpty()) { + throw new InvalidStreamTopologyString("topology string contains only one line"); + } + + final var processorTopology = new ProcessorTopologyDTO(); + processorTopology.setProcessorsNumber(0); + processorTopology.setTopicsNumber(0); + processorTopology.setTopology(getTopologyGraph()); + SubTopologyNodeDTO subTopologyNode = null; + + for (String line : topologyLines) { + if (line.contains(SUB_TOPOLOGY.value)) { + subTopologyNode = parseSubTopology(line); + subTopologyNode.setSubTopology(getTopologyGraph()); + + } else { + if (subTopologyNode == null) { + throw new InvalidStreamTopologyString("cannot find subTopology"); + } + if (line.contains(NEXT.value)) { + putAdjacencyOfLastNode( + subTopologyNode.getSubTopology().getAdjacency(), + parserHelper.parseArrayOrThrow(line, NEXT.value)); + } else { + final var finalSubTopologyNode = subTopologyNode; + parseSubTopologyNode(line) + .ifPresent(res -> putParsedNode(processorTopology, finalSubTopologyNode, res)); + } + } + } + + return processorTopology; + } + + private void putAdjacencyOfLastNode(Map> adjacency, + List nextReferences) { + int count = 1; + for (var entry : adjacency.entrySet()) { + if (count == adjacency.size()) { + entry.getValue().addAll(nextReferences); + return; + } + count++; + } + throw new InvalidStreamTopologyString("cannot find node for adjacency"); + } + + private TopologyGraphDTO getTopologyGraph() { + final var topologyGraph = new TopologyGraphDTO(); + topologyGraph.setAdjacency(new LinkedHashMap<>()); + topologyGraph.setNodes(new LinkedHashMap<>()); + return topologyGraph; + } + + private void putParsedNode(ProcessorTopologyDTO processorTopology, + SubTopologyNodeDTO subTopologyNode, + GraphNodeDTO node) { + final var topologyGraph = processorTopology.getTopology(); + final var subTopologyGraph = subTopologyNode.getSubTopology(); + subTopologyGraph.putNodesItem(node.getName(), node); + subTopologyGraph.getAdjacency().putIfAbsent(node.getName(), new ArrayList<>()); + + switch (node.getType()) { + case SOURCE_PROCESSOR: + processorTopology.setProcessorsNumber(processorTopology.getProcessorsNumber() + 1); + var source = (SourceProcessorNodeDTO) node; + source.getTopics() + .forEach(topic -> { + putTopicNode(processorTopology, topologyGraph, topic); + topologyGraph.getAdjacency().get(topic).add(subTopologyNode.getName()); + } + ); + break; + case PROCESSOR: + case SINK_PROCESSOR: + processorTopology.setProcessorsNumber(processorTopology.getProcessorsNumber() + 1); + if (!topologyGraph.getNodes().containsKey(subTopologyNode.getName())) { + topologyGraph.putNodesItem(subTopologyNode.getName(), subTopologyNode); + topologyGraph.getAdjacency() + .putIfAbsent(subTopologyNode.getName(), new ArrayList<>()); + } + + if (GraphNodeTypeDTO.SINK_PROCESSOR == node.getType()) { + var sink = (SinkProcessorNodeDTO) node; + + putTopicNode(processorTopology, topologyGraph, sink.getTopic()); + topologyGraph.getAdjacency().get(subTopologyNode.getName()).add(sink.getTopic()); + } + break; + default: + log.warn("unknown topology node type"); + break; + } + } + + private void putTopicNode(ProcessorTopologyDTO processorTopology, + TopologyGraphDTO topologyGraph, + String topic) { + final var topicNode = new TopicNodeDTO(); + topicNode.setName(topic); + topicNode.setType(GraphNodeTypeDTO.TOPIC); + + if (!topologyGraph.getNodes().containsKey(topicNode.getName())) { + processorTopology.setTopicsNumber(processorTopology.getTopicsNumber() + 1); + topologyGraph.putNodesItem(topicNode.getName(), topicNode); + topologyGraph.getAdjacency().putIfAbsent(topic, new ArrayList<>()); + } + } + + private SubTopologyNodeDTO parseSubTopology(String topologyLine) { + var parsedName = + parserHelper.parseOrThrow(topologyLine, SUB_TOPOLOGY.value); + + final var subTopologyNode = new SubTopologyNodeDTO(); + subTopologyNode.setName(parsedName.value); + subTopologyNode.setType(GraphNodeTypeDTO.SUB_TOPOLOGY); + return subTopologyNode; + } + + private Optional parseSubTopologyNode(String topologyLine) { + if (topologyLine.contains(SOURCE.value)) { + return Optional.of(parseSource(topologyLine)); + } else if (topologyLine.contains(PROCESSOR.value)) { + return Optional.of(parseProcessor(topologyLine)); + } else if (topologyLine.contains(SINK.value)) { + return Optional.of(parseSink(topologyLine)); + } else { + return Optional.empty(); + } + } + + private GraphNodeDTO parseSource(String topologyLine) { + final var parsedSourceName = + parserHelper.parseOrThrow(topologyLine, SOURCE.value, 0, "("); + final var parsedTopics = + parserHelper.parseArrayOrThrow(topologyLine, "[", parsedSourceName.endIndex, "]"); + + final var sourceProcessorNode = new SourceProcessorNodeDTO(); + sourceProcessorNode.setName(parsedSourceName.value); + sourceProcessorNode.setType(GraphNodeTypeDTO.SOURCE_PROCESSOR); + sourceProcessorNode.setTopics(parsedTopics); + return sourceProcessorNode; + } + + private GraphNodeDTO parseProcessor(String topologyLine) { + final var parsedProcessorName = + parserHelper.parseOrThrow(topologyLine, PROCESSOR.value, 0, "("); + final var parsedStores = + parserHelper.parseArrayOrThrow(topologyLine, "[", parsedProcessorName.endIndex, "]"); + + final var processorNode = new ProcessorNodeDTO(); + processorNode.setName(parsedProcessorName.value); + processorNode.setType(GraphNodeTypeDTO.PROCESSOR); + processorNode.setStores(parsedStores); + + return processorNode; + } + + private GraphNodeDTO parseSink(String topologyLine) { + final var parsedSinkName = + parserHelper.parseOrThrow(topologyLine, SINK.value, 0, "("); + final var parsedTopic = + parserHelper.parseOrThrow(topologyLine, TOPIC.value, parsedSinkName.endIndex, ")"); + + final var sinkNode = new SinkProcessorNodeDTO(); + sinkNode.setName(parsedSinkName.value); + sinkNode.setType(GraphNodeTypeDTO.SINK_PROCESSOR); + sinkNode.setTopic(parsedTopic.value); + + return sinkNode; + } + + enum TopologyLiterals { + SUB_TOPOLOGY("Sub-topology:"), + SOURCE("Source:"), + PROCESSOR("Processor:"), + SINK("Sink:"), + NEXT("-->"), + PREVIOUS("<--"), + TOPIC("topic:"); + public final String value; + + TopologyLiterals(String value) { + this.value = value; + } + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/topology/parser/StreamTopologyParserHelper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/topology/parser/StreamTopologyParserHelper.java new file mode 100644 index 00000000000..7a78c0d2ddf --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/topology/parser/StreamTopologyParserHelper.java @@ -0,0 +1,77 @@ +package com.provectus.kafka.ui.service.topology.parser; + +import com.provectus.kafka.ui.exception.InvalidStreamTopologyString; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +@Component +public class StreamTopologyParserHelper { + ParsingRes parseOrThrow(String s, String after, int fromIndex, String before) { + final int beginIndex = indexAfterStringOrThrow(s, after, fromIndex); + final int endIndex = indexOfOrThrow(s, before, beginIndex); + final var result = s.substring(beginIndex, endIndex).strip(); + return ParsingRes.of(result, endIndex); + } + + ParsingRes parseOrThrow(String s, String after) { + final int beginIndex = indexAfterStringOrThrow(s, after, 0); + final var result = s.substring(beginIndex).strip(); + return ParsingRes.of(result, s.length()); + } + + List parseArrayOrThrow(String s, String after, int fromIndex, String before) { + final int listBegin = indexAfterStringOrThrow(s, after, fromIndex); + final int listEnd = indexOfOrThrow(s, before, listBegin); + return getArrayResult(s, listBegin, listEnd); + } + + List parseArrayOrThrow(String s, String after) { + final int listBegin = indexAfterStringOrThrow(s, after, 0); + final int listEnd = s.length(); + return getArrayResult(s, listBegin, listEnd); + } + + private List getArrayResult(String s, int listBegin, int listEnd) { + final var parsedList = + Arrays.stream(s.substring(listBegin, listEnd).split(",")) + .filter(StringUtils::hasText) + .map(String::strip) + .collect(Collectors.toList()); + return parsedList; + } + + int indexAfterStringOrThrow(String s, String str, int fromIndex) { + final int index = indexOfOrThrow(s, str, fromIndex); + return index + str.length(); + } + + private int indexOfOrThrow(String s, String str, int fromIndex) { + final int index = s.indexOf(str, fromIndex); + if (fromIndex == -1 || fromIndex >= s.length() || index == -1) { + throw new InvalidStreamTopologyString( + String.format("cannot find string %s in topology string", str)); + } + return index; + } + + static class ParsingRes { + T value; + int endIndex; + + private ParsingRes() { + } + + static ParsingRes of(T res, Integer endIndex) { + if (endIndex == null) { + throw new IllegalArgumentException("endIndex cannot be null"); + } + final var parsingRes = new ParsingRes(); + parsingRes.value = res; + parsingRes.endIndex = endIndex; + return parsingRes; + } + } +} diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index 93a5745c388..7df032396b2 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -9,6 +9,9 @@ kafka: - name: first address: http://localhost:8083 jmxPort: 9997 + streamApplications: + - applicationId: + topologyUrl: # - # name: secondLocal # bootstrapServers: localhost:9093 diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/StreamTopologyParserTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/StreamTopologyParserTest.java new file mode 100644 index 00000000000..a8942d2bb67 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/StreamTopologyParserTest.java @@ -0,0 +1,199 @@ +package com.provectus.kafka.ui.service; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.exception.InvalidStreamTopologyString; +import com.provectus.kafka.ui.model.GraphNodeDTO; +import com.provectus.kafka.ui.model.GraphNodeTypeDTO; +import com.provectus.kafka.ui.model.ProcessorNodeDTO; +import com.provectus.kafka.ui.model.ProcessorTopologyDTO; +import com.provectus.kafka.ui.model.SinkProcessorNodeDTO; +import com.provectus.kafka.ui.model.SourceProcessorNodeDTO; +import com.provectus.kafka.ui.model.SubTopologyNodeDTO; +import com.provectus.kafka.ui.model.TopicNodeDTO; +import com.provectus.kafka.ui.model.TopologyGraphDTO; +import com.provectus.kafka.ui.service.topology.parser.StreamTopologyParser; +import com.provectus.kafka.ui.service.topology.parser.StreamTopologyParserHelper; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import org.junit.jupiter.api.Test; + +class StreamTopologyParserTest { + private final StreamTopologyParser parser = + new StreamTopologyParser(new StreamTopologyParserHelper()); + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Test + void edgeCasesTest() { + assertThrows(InvalidStreamTopologyString.class, () -> parser.parse("")); + assertThrows(InvalidStreamTopologyString.class, () -> parser.parse(null)); + assertThrows(InvalidStreamTopologyString.class, () -> parser.parse("invalid topology")); + assertThrows(InvalidStreamTopologyString.class, + () -> parser.parse("Topologies:\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [inputTopic])\n" + + " --> KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n" + + " --> count-repartition-filter\n" + + " <-- KSTREAM-SOURCE-0000000000")); + assertThrows(InvalidStreamTopologyString.class, + () -> parser.parse( + "Topologies:\n" + + " Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])\n" + + " --> KSTREAM-KEY-SELECT-0000000002-repartition-filter\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-KEY-SELECT-0000000002-repartition-filter (stores: [])\n" + + " --> KSTREAM-KEY-SELECT-0000000002-repartition-sink\n" + + " <-- KSTREAM-KEY-SELECT-0000000002")); + } + + @Test + void parseTest() throws URISyntaxException, IOException { + var topologyPath = Paths + .get(getClass().getClassLoader().getResource("test/topologies/test topology.txt").toURI()); + var topology = Files.readString(topologyPath); + + final var expected = getExpectedTestTopology(); + final var actual = parser.parse(topology); + + final var prettyObjectWriter = objectMapper.writerWithDefaultPrettyPrinter(); + assertEquals(prettyObjectWriter.writeValueAsString(expected), + prettyObjectWriter.writeValueAsString(actual)); + } + + private ProcessorTopologyDTO getExpectedTestTopology() { + // init topology objects + final ProcessorTopologyDTO processorTopology = new ProcessorTopologyDTO(); + processorTopology.setTopicsNumber(4); + processorTopology.setProcessorsNumber(11); + + final TopologyGraphDTO topologyGraph = getTopologyGraph(); + processorTopology.setTopology(topologyGraph); + + final SubTopologyNodeDTO subTopologyNode = new SubTopologyNodeDTO(); + subTopologyNode.setName("0"); + subTopologyNode.setType(GraphNodeTypeDTO.SUB_TOPOLOGY); + + final TopologyGraphDTO subTopologyGraph = getTopologyGraph(); + subTopologyNode.setSubTopology(subTopologyGraph); + + final SubTopologyNodeDTO subTopologyNode1 = new SubTopologyNodeDTO(); + subTopologyNode1.setName("1"); + subTopologyNode1.setType(GraphNodeTypeDTO.SUB_TOPOLOGY); + + final TopologyGraphDTO subTopologyGraph1 = getTopologyGraph(); + subTopologyNode1.setSubTopology(subTopologyGraph1); + + //init node objects + // sub topology 0 nodes + final TopicNodeDTO topicNode = getTopicNode("inputTopic"); + final var sub0_sourceProcessor = + getSourceProcessorNode("KSTREAM-SOURCE-0000000000", List.of(topicNode.getName())); + final var sub0_processor1 = + getProcessorNode("KSTREAM-KEY-SELECT-0000000001", List.of()); + final var sub0_processor2 = + getProcessorNode("count-repartition-filter", List.of()); + final var sub0_sinkTopic = getTopicNode("count-repartition"); + final var sub0_sinkProcessor = + getSinkProcessorNode("count-repartition-sink", sub0_sinkTopic.getName()); + + // sub topology 1 nodes + final TopicNodeDTO topicNode1 = getTopicNode("count-repartition"); + final SourceProcessorNodeDTO sub1_sourceProcessor = + getSourceProcessorNode("count-repartition-source", List.of(topicNode1.getName())); + final ProcessorNodeDTO sub1_processor1 = + getProcessorNode("KSTREAM-AGGREGATE-0000000002", List.of("count-store")); + final ProcessorNodeDTO sub1_processor2 = + getProcessorNode("KSTREAM-AGGREGATE-0000000008", List.of("windowed-count-store")); + final ProcessorNodeDTO sub1_processor3 = + getProcessorNode("KTABLE-TOSTREAM-0000000006", List.of()); + final ProcessorNodeDTO sub1_processor4 = + getProcessorNode("KTABLE-TOSTREAM-0000000012", List.of()); + final TopicNodeDTO sub1_sinkTopic1 = getTopicNode("count-topic"); + final TopicNodeDTO sub1_sinkTopic2 = getTopicNode("windowed-count"); + final SinkProcessorNodeDTO sub1_sink = + getSinkProcessorNode("KSTREAM-SINK-0000000007", sub1_sinkTopic1.getName()); + final SinkProcessorNodeDTO sub2_sink = + getSinkProcessorNode("KSTREAM-SINK-0000000013", sub1_sinkTopic2.getName()); + + + // establish sub topology 0 node connections + putNode(subTopologyGraph, sub0_sourceProcessor, List.of(sub0_processor1.getName())); + putNode(subTopologyGraph, sub0_processor1, List.of(sub0_processor2.getName())); + putNode(subTopologyGraph, sub0_processor2, List.of(sub0_sinkProcessor.getName())); + putNode(subTopologyGraph, sub0_sinkProcessor, List.of()); + + // establish sub topology 1 node connections + putNode(subTopologyGraph1, sub1_sourceProcessor, + List.of(sub1_processor1.getName(), sub1_processor2.getName())); + putNode(subTopologyGraph1, sub1_processor1, List.of(sub1_processor3.getName())); + putNode(subTopologyGraph1, sub1_processor2, List.of(sub1_processor4.getName())); + putNode(subTopologyGraph1, sub1_processor3, List.of(sub1_sink.getName())); + putNode(subTopologyGraph1, sub1_processor4, List.of(sub2_sink.getName())); + putNode(subTopologyGraph1, sub1_sink, List.of()); + putNode(subTopologyGraph1, sub2_sink, List.of()); + + // establish outer topology node connections + putNode(topologyGraph, topicNode, List.of(subTopologyNode.getName())); + putNode(topologyGraph, subTopologyNode, List.of(sub0_sinkTopic.getName())); + putNode(topologyGraph, sub0_sinkTopic, List.of(subTopologyNode1.getName())); + putNode(topologyGraph, subTopologyNode1, + List.of(sub1_sinkTopic1.getName(), sub1_sinkTopic2.getName())); + putNode(topologyGraph, sub1_sinkTopic1, List.of()); + putNode(topologyGraph, sub1_sinkTopic2, List.of()); + + return processorTopology; + } + + private void putNode(TopologyGraphDTO subTopologyGraph, + GraphNodeDTO node, + List adjItems) { + subTopologyGraph.putNodesItem(node.getName(), node); + subTopologyGraph.getAdjacency().putIfAbsent(node.getName(), new ArrayList<>()); + subTopologyGraph.getAdjacency().get(node.getName()).addAll(adjItems); + } + + private TopologyGraphDTO getTopologyGraph() { + final TopologyGraphDTO topologyGraph = new TopologyGraphDTO(); + topologyGraph.setNodes(new LinkedHashMap<>()); + topologyGraph.setAdjacency(new LinkedHashMap<>()); + return topologyGraph; + } + + private TopicNodeDTO getTopicNode(String name) { + final TopicNodeDTO topicNode = new TopicNodeDTO(); + topicNode.setName(name); + topicNode.setType(GraphNodeTypeDTO.TOPIC); + return topicNode; + } + + private SourceProcessorNodeDTO getSourceProcessorNode(String name, List topics) { + final var sourceProcessorNode = new SourceProcessorNodeDTO(); + sourceProcessorNode.setName(name); + sourceProcessorNode.setType(GraphNodeTypeDTO.SOURCE_PROCESSOR); + sourceProcessorNode.setTopics(topics); + return sourceProcessorNode; + } + + private ProcessorNodeDTO getProcessorNode(String name, List stores) { + final var processorNode = new ProcessorNodeDTO(); + processorNode.setName(name); + processorNode.setType(GraphNodeTypeDTO.PROCESSOR); + processorNode.setStores(stores); + return processorNode; + } + + private SinkProcessorNodeDTO getSinkProcessorNode(String name, String topic) { + final var sinkNode = new SinkProcessorNodeDTO(); + sinkNode.setName(name); + sinkNode.setType(GraphNodeTypeDTO.SINK_PROCESSOR); + sinkNode.setTopic(topic); + return sinkNode; + } +} \ No newline at end of file diff --git a/kafka-ui-api/src/test/resources/test/topologies/test topology.txt b/kafka-ui-api/src/test/resources/test/topologies/test topology.txt new file mode 100644 index 00000000000..781a769723d --- /dev/null +++ b/kafka-ui-api/src/test/resources/test/topologies/test topology.txt @@ -0,0 +1,35 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [inputTopic]) + --> KSTREAM-KEY-SELECT-0000000001 + Processor: KSTREAM-KEY-SELECT-0000000001 (stores: []) + --> count-repartition-filter + <-- KSTREAM-SOURCE-0000000000 + Processor: count-repartition-filter (stores: []) + --> count-repartition-sink + <-- KSTREAM-KEY-SELECT-0000000001 + Sink: count-repartition-sink (topic: count-repartition) + <-- count-repartition-filter + + Sub-topology: 1 + Source: count-repartition-source (topics: [count-repartition]) + --> KSTREAM-AGGREGATE-0000000002, KSTREAM-AGGREGATE-0000000008 + Processor: KSTREAM-AGGREGATE-0000000002 (stores: [count-store]) + --> KTABLE-TOSTREAM-0000000006 + <-- count-repartition-source + Processor: KSTREAM-AGGREGATE-0000000008 (stores: [windowed-count-store]) + --> KTABLE-TOSTREAM-0000000012 + <-- count-repartition-source + Processor: KTABLE-TOSTREAM-0000000006 (stores: []) + --> KSTREAM-SINK-0000000007 + <-- KSTREAM-AGGREGATE-0000000002 + Processor: KTABLE-TOSTREAM-0000000012 (stores: []) + --> KSTREAM-SINK-0000000013 + <-- KSTREAM-AGGREGATE-0000000008 + Sink: KSTREAM-SINK-0000000007 (topic: count-topic) + <-- KTABLE-TOSTREAM-0000000006 + Sink: KSTREAM-SINK-0000000013 (topic: windowed-count) + <-- KTABLE-TOSTREAM-0000000012 + + + diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index cb882ef078a..a69c74ac105 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1456,6 +1456,51 @@ paths: 404: description: Not found + /api/clusters/{clusterName}/streamtopologies: + get: + tags: + - StreamTopologies + summary: get stream applications + operationId: getStreamApplications + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/StreamApplications' + + /api/clusters/{clusterName}/streamtopologies/{applicationId}: + get: + tags: + - StreamTopologies + summary: get stream topology + operationId: getStreamTopology + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: applicationId + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ProcessorTopology' + components: schemas: ErrorResponse: @@ -2567,4 +2612,100 @@ components: value: type: string source: - $ref: '#/components/schemas/ConfigSource' \ No newline at end of file + $ref: '#/components/schemas/ConfigSource' + + GraphNodeType: + type: string + enum: + - TOPIC + - SOURCE_PROCESSOR + - PROCESSOR + - SINK_PROCESSOR + - SUB_TOPOLOGY + + GraphNode: + type: object + properties: + name: + type: string + type: + $ref: '#/components/schemas/GraphNodeType' + + TopicNode: + allOf: + - $ref: '#/components/schemas/GraphNode' + - type: object + properties: + inputMessagesNumber: + type: integer + outputMessagesNumber: + type: integer + lag: + type: integer + + ProcessorNode: + allOf: + - $ref: '#/components/schemas/GraphNode' + - type: object + properties: + stores: + type: array + items: + type: string + + SourceProcessorNode: + allOf: + - $ref: '#/components/schemas/GraphNode' + - type: object + properties: + topics: + type: array + items: + type: string + + SinkProcessorNode: + allOf: + - $ref: '#/components/schemas/GraphNode' + - type: object + properties: + topic: + type: string + + TopologyGraph: + type: object + properties: + adjacency: + type: object + additionalProperties: + type: array + items: + type: string + nodes: + additionalProperties: + $ref: '#/components/schemas/GraphNode' + + SubTopologyNode: + allOf: + - $ref: '#/components/schemas/GraphNode' + - type: object + properties: + subTopology: + $ref: '#/components/schemas/TopologyGraph' + + ProcessorTopology: + type: object + properties: + processorsNumber: + type: integer + topicsNumber: + type: integer + topology: + $ref: '#/components/schemas/TopologyGraph' + + StreamApplications: + type: object + properties: + applicationIds: + type: array + items: + type: string \ No newline at end of file