Skip to content

Commit 6a19c0e

Browse files
committed
topology visualizer contract (#205)
1 parent 97e9db0 commit 6a19c0e

File tree

2 files changed

+256
-0
lines changed

2 files changed

+256
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package com.provectus.kafka.ui.controller;
2+
3+
import com.provectus.kafka.ui.api.StreamTopologyApi;
4+
import com.provectus.kafka.ui.model.GlobalTopology;
5+
import com.provectus.kafka.ui.model.ProcessorNode;
6+
import com.provectus.kafka.ui.model.ProcessorTopologyNode;
7+
import com.provectus.kafka.ui.model.SinkProcessorNode;
8+
import com.provectus.kafka.ui.model.SourceProcessorNode;
9+
import com.provectus.kafka.ui.model.SubTopologyNode;
10+
import com.provectus.kafka.ui.model.TopicNode;
11+
import com.provectus.kafka.ui.model.TopologyGraph;
12+
import com.provectus.kafka.ui.model.TopologyNode;
13+
import java.util.LinkedHashMap;
14+
import java.util.List;
15+
import org.springframework.http.HttpStatus;
16+
import org.springframework.http.ResponseEntity;
17+
import org.springframework.web.bind.annotation.ResponseStatus;
18+
import org.springframework.web.bind.annotation.RestController;
19+
import org.springframework.web.server.ServerWebExchange;
20+
import reactor.core.publisher.Mono;
21+
22+
@RestController
23+
public class StreamTopologyController implements StreamTopologyApi {
24+
@Override
25+
@ResponseStatus(HttpStatus.NOT_IMPLEMENTED)
26+
public Mono<ResponseEntity<GlobalTopology>> getStreamTopology(
27+
ServerWebExchange exchange) {
28+
return Mono.empty();
29+
}
30+
31+
@Override
32+
public Mono<ResponseEntity<GlobalTopology>> getDummyStreamTopology(ServerWebExchange exchange) {
33+
return Mono.just(getDummyTopology())
34+
.map(ResponseEntity::ok);
35+
}
36+
37+
private static GlobalTopology getDummyTopology() {
38+
final GlobalTopology globalTopology = new GlobalTopology();
39+
final TopologyGraph topology = new TopologyGraph();
40+
globalTopology.setTopology(topology);
41+
42+
final TopologyNode sourceConnector = new TopologyNode();
43+
sourceConnector.setName("source connector");
44+
sourceConnector.setType(TopologyNode.TypeEnum.SOURCE_CONNECTOR);
45+
46+
final TopologyNode sourceTopic = new TopicNode();
47+
sourceTopic.setName("conversation-meta");
48+
sourceTopic.setType(TopologyNode.TypeEnum.TOPIC);
49+
50+
final ProcessorTopologyNode topologyNode = new ProcessorTopologyNode();
51+
topologyNode.setName("topology 1");
52+
topologyNode.setType(TopologyNode.TypeEnum.PROCESSOR_TOPOLOGY);
53+
54+
final TopologyNode sinkTopic = new TopicNode();
55+
sinkTopic.setName("streams-count-resolved");
56+
sinkTopic.setType(TopologyNode.TypeEnum.TOPIC);
57+
58+
final TopologyNode sinkConnector = new ProcessorTopologyNode();
59+
sinkConnector.setName("sink connector");
60+
sinkConnector.setType(TopologyNode.TypeEnum.SINK_CONNECTOR);
61+
62+
topology.setAdjacency(
63+
new LinkedHashMap<>() {{
64+
put(sourceConnector.getName(), List.of(sourceTopic.getName()));
65+
put(sourceTopic.getName(), List.of(topologyNode.getName()));
66+
put(topologyNode.getName(), List.of(sinkTopic.getName()));
67+
put(sinkTopic.getName(), List.of(sinkConnector.getName()));
68+
put(sinkConnector.getName(), List.of());
69+
}}
70+
);
71+
topology
72+
.setNodes(List.of(sourceConnector, sourceTopic, topologyNode, sinkTopic, sinkConnector));
73+
74+
final SubTopologyNode subTopologyNode =
75+
getSubTopologyNode("sub-topology 0", "KSTREAM-SOURCE-0000000000",
76+
"KSTREAM-TRANSFORM-0000000001", "KSTREAM-SINK-000000002", "conversation-meta",
77+
"count-resolved-repartition");
78+
79+
final TopologyNode interTopic = new TopicNode();
80+
interTopic.setName("count-resolved-repartition");
81+
interTopic.setType(TopologyNode.TypeEnum.TOPIC);
82+
83+
final SubTopologyNode subTopologyNode1 =
84+
getSubTopologyNode("sub-topology 1", "KSTREAM-SOURCE-000000003",
85+
"KSTREAM-TRANSFORM-000000004", "KSTREAM-SINK-000000005", "count-resolved-repartition",
86+
"streams-count-resolved");
87+
88+
topologyNode.setTopology(List.of(subTopologyNode, interTopic, subTopologyNode1));
89+
90+
return globalTopology;
91+
}
92+
93+
private static SubTopologyNode getSubTopologyNode(String subTopologyName, String sourceProcessor,
94+
String transformProcessor, String sinkProcessor,
95+
String sourceTopic, String sinkTopic) {
96+
final SubTopologyNode subTopologyNode = new SubTopologyNode();
97+
subTopologyNode.setName(subTopologyName);
98+
subTopologyNode.setType(TopologyNode.TypeEnum.SUB_TOPOLOGY);
99+
100+
final TopologyGraph subTopology = new TopologyGraph();
101+
102+
final SourceProcessorNode topology0Processor0 = new SourceProcessorNode();
103+
topology0Processor0.setName(sourceProcessor);
104+
topology0Processor0.setType(TopologyNode.TypeEnum.SOURCE_PROCESSOR);
105+
topology0Processor0.addTopicsItem(sourceTopic);
106+
107+
final ProcessorNode topology0Processor1 = new ProcessorNode();
108+
topology0Processor1.setName(transformProcessor);
109+
topology0Processor1.setType(TopologyNode.TypeEnum.PROCESSOR);
110+
topology0Processor1.addStoresItem("store");
111+
112+
final SinkProcessorNode topology0Processor2 = new SinkProcessorNode();
113+
topology0Processor2.setName(sinkProcessor);
114+
topology0Processor2.setType(TopologyNode.TypeEnum.SINK_PROCESSOR);
115+
topology0Processor2.setTopic(sinkTopic);
116+
117+
subTopology.setAdjacency(
118+
new LinkedHashMap<>() {{
119+
put(topology0Processor0.getName(), List.of(topology0Processor1.getName()));
120+
put(topology0Processor1.getName(), List.of(topology0Processor2.getName()));
121+
put(topology0Processor2.getName(), List.of());
122+
}}
123+
124+
);
125+
subTopology.setNodes(List.of(topology0Processor0, topology0Processor1, topology0Processor2));
126+
subTopologyNode.setSubTopology(subTopology);
127+
return subTopologyNode;
128+
}
129+
}

kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

+127
Original file line numberDiff line numberDiff line change
@@ -1225,6 +1225,34 @@ paths:
12251225
404:
12261226
description: Not found
12271227

1228+
/api/streamtopology:
1229+
get:
1230+
tags:
1231+
- StreamTopology
1232+
summary: get stream topology
1233+
operationId: getStreamTopology
1234+
responses:
1235+
200:
1236+
description: OK
1237+
content:
1238+
application/json:
1239+
schema:
1240+
$ref: '#/components/schemas/GlobalTopology'
1241+
1242+
/api/streamtopology/dummy:
1243+
get:
1244+
tags:
1245+
- StreamTopology
1246+
summary: get dummy stream topology
1247+
operationId: getDummyStreamTopology
1248+
responses:
1249+
200:
1250+
description: OK
1251+
content:
1252+
application/json:
1253+
schema:
1254+
$ref: '#/components/schemas/GlobalTopology'
1255+
12281256
components:
12291257
schemas:
12301258
ErrorResponse:
@@ -2027,3 +2055,102 @@ components:
20272055
required:
20282056
- totalPartitionsCount
20292057
- topicName
2058+
2059+
TopologyNode:
2060+
type: object
2061+
properties:
2062+
name:
2063+
type: string
2064+
type:
2065+
type: string
2066+
enum:
2067+
- TOPIC
2068+
- SOURCE_CONNECTOR
2069+
- SINK_CONNECTOR
2070+
- SOURCE_PROCESSOR
2071+
- PROCESSOR
2072+
- SINK_PROCESSOR
2073+
- PROCESSOR_TOPOLOGY
2074+
- SUB_TOPOLOGY
2075+
2076+
TopicNode:
2077+
allOf:
2078+
- $ref: '#/components/schemas/TopologyNode'
2079+
- type: object
2080+
properties:
2081+
inputMessagesNumber:
2082+
type: integer
2083+
outputMessagesNumber:
2084+
type: integer
2085+
lag:
2086+
type: integer
2087+
2088+
ProcessorNode:
2089+
allOf:
2090+
- $ref: '#/components/schemas/TopologyNode'
2091+
- type: object
2092+
properties:
2093+
stores:
2094+
type: array
2095+
items:
2096+
type: string
2097+
2098+
SourceProcessorNode:
2099+
allOf:
2100+
- $ref: '#/components/schemas/TopologyNode'
2101+
- type: object
2102+
properties:
2103+
topics:
2104+
type: array
2105+
items:
2106+
type: string
2107+
2108+
SinkProcessorNode:
2109+
allOf:
2110+
- $ref: '#/components/schemas/TopologyNode'
2111+
- type: object
2112+
properties:
2113+
topic:
2114+
type: string
2115+
2116+
TopologyGraph:
2117+
type: object
2118+
properties:
2119+
adjacency:
2120+
type: object
2121+
additionalProperties:
2122+
type: array
2123+
items:
2124+
type: string
2125+
nodes:
2126+
type: array
2127+
items:
2128+
$ref: '#/components/schemas/TopologyNode'
2129+
2130+
SubTopologyNode:
2131+
allOf:
2132+
- $ref: '#/components/schemas/TopologyNode'
2133+
- type: object
2134+
properties:
2135+
subTopology:
2136+
$ref: '#/components/schemas/TopologyGraph'
2137+
2138+
ProcessorTopologyNode:
2139+
allOf:
2140+
- $ref: '#/components/schemas/TopologyNode'
2141+
- type: object
2142+
properties:
2143+
processorsNumber:
2144+
type: integer
2145+
topicsNumber:
2146+
type: integer
2147+
topology:
2148+
type: array
2149+
items:
2150+
$ref: '#/components/schemas/TopologyNode'
2151+
2152+
GlobalTopology:
2153+
type: object
2154+
properties:
2155+
topology:
2156+
$ref: '#/components/schemas/TopologyGraph'

0 commit comments

Comments
 (0)