Skip to content

Feature/topology visualizer contract #205 #639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6a19c0e
topology visualizer contract (#205)
Flyur Jul 6, 2021
87460ba
Merge branch 'master' into feature/topology_visualizer_contract
Flyur Jul 6, 2021
2269f48
map init refactoring (#205)
Flyur Jul 6, 2021
6917274
small refactoring (#205)
Flyur Jul 7, 2021
19b981f
represented processor topology as a graph, fixed demo endpoint (#205)
Flyur Jul 9, 2021
d1e551b
dummy endpoint removal, temp topology model simplification, endpoint …
Flyur Jul 10, 2021
56880ed
Merge branch 'master' into feature/topology_visualizer_contract
Flyur Jul 10, 2021
14e3be0
api schema rename
Flyur Jul 10, 2021
c518e2d
stream topologies endpoint, application id type changed to string
Flyur Jul 12, 2021
fbfbba5
stream topology parser
Flyur Aug 15, 2021
4d20253
StreamTopologyParserHelper class for util methods
Flyur Aug 15, 2021
17ef2b0
small refactoring
Flyur Aug 15, 2021
ab4e778
small refactoring
Flyur Aug 15, 2021
9192fde
getTopologyApplications fix
Flyur Aug 15, 2021
c7b697c
line breaks handling fix
Flyur Aug 15, 2021
7e7ba8d
hasNext handling refactoring
Flyur Aug 16, 2021
0da94ce
small refactoring
Flyur Aug 16, 2021
1fcae4b
test topology parsing test
Flyur Aug 16, 2021
75d5a8d
refactoring - line based string iteration
Flyur Aug 18, 2021
9abf039
Merge branch 'master' into feature/topology_visualizer_contract
Flyur Aug 18, 2021
bb0cc1e
streamApplications property default value
Flyur Aug 18, 2021
b33fa87
Merge branch 'master' into feature/topology_visualizer_contract
Oct 13, 2021
df19d79
Add DTO suffix as allowed abbreviations in checkstyle.xml
Oct 13, 2021
1ea6fea
Fix model names and imports for Stream Lineage entities
Oct 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions etc/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@
<property name="tokens"
value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, ANNOTATION_DEF, ANNOTATION_FIELD_DEF,
PARAMETER_DEF, VARIABLE_DEF, METHOD_DEF"/>
<property name="allowedAbbreviations" value="DTO"/>
</module>
<module name="OverloadMethodsDeclarationOrder"/>
<module name="VariableDeclarationUsageDistance"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
import java.util.Map;
import java.util.Properties;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties("kafka")
@EnableConfigurationProperties
@Data
public class ClustersProperties {

Expand All @@ -36,6 +40,7 @@ public static class Cluster {
Properties properties;
boolean readOnly = false;
boolean disableLogDirsCollection = false;
List<StreamApplication> streamApplications = new ArrayList<>();
}

@Data
Expand All @@ -49,4 +54,11 @@ public static class SchemaRegistryAuth {
String username;
String password;
}

@Getter
@Setter
public static class StreamApplication {
private String applicationId;
private String topologyUrl;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.provectus.kafka.ui.controller;

import com.provectus.kafka.ui.api.StreamTopologiesApi;
import com.provectus.kafka.ui.model.ProcessorTopologyDTO;
import com.provectus.kafka.ui.model.StreamApplicationsDTO;
import com.provectus.kafka.ui.service.StreamTopologyService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

@RestController
@RequiredArgsConstructor
public class StreamTopologyController implements StreamTopologiesApi {
private final StreamTopologyService topologyService;

@Override
public Mono<ResponseEntity<StreamApplicationsDTO>> getStreamApplications(
String clusterName, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(topologyService.getTopologyApplications(clusterName)));
}

@Override
public Mono<ResponseEntity<ProcessorTopologyDTO>> getStreamTopology(String clusterName,
String applicationId,
ServerWebExchange exchange) {
return topologyService.getStreamTopology(clusterName, applicationId)
.map(ResponseEntity::ok);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public enum ErrorCode {
KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND),
DIR_NOT_FOUND(4012, HttpStatus.BAD_REQUEST),
TOPIC_OR_PARTITION_NOT_FOUND(4013, HttpStatus.BAD_REQUEST),
INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST);
INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST),
INVALID_STREAM_TOPOLOGY_STRING(4015, HttpStatus.UNPROCESSABLE_ENTITY);

static {
// codes uniqueness check
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.provectus.kafka.ui.exception;

public class InvalidStreamTopologyString extends RuntimeException {
public InvalidStreamTopologyString() {
super("Invalid stream topology string");
}

public InvalidStreamTopologyString(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.provectus.kafka.ui.exception;

public class StreamTopologyParsingException extends CustomBaseException {
public StreamTopologyParsingException() {
super("Stream topology string is invalid");
}

public StreamTopologyParsingException(String message) {
super(message);
}

public StreamTopologyParsingException(String message, Throwable cause) {
super(message, cause);
}

@Override
public ErrorCode getErrorCode() {
return ErrorCode.INVALID_STREAM_TOPOLOGY_STRING;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.provectus.kafka.ui.service;

import static com.provectus.kafka.ui.config.ClustersProperties.StreamApplication;

import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.exception.ClusterNotFoundException;
import com.provectus.kafka.ui.exception.InvalidStreamTopologyString;
import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.StreamTopologyParsingException;
import com.provectus.kafka.ui.model.ProcessorTopologyDTO;
import com.provectus.kafka.ui.model.StreamApplicationsDTO;
import com.provectus.kafka.ui.service.topology.parser.StreamTopologyParser;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

@Log4j2
@Service
public class StreamTopologyService {
private final Map<String, Map<String, String>> clusterStreamApps;
private final WebClient webClient;
private final StreamTopologyParser topologyParser;

public StreamTopologyService(ClustersProperties clustersProperties, WebClient webClient,
StreamTopologyParser topologyParser) {
this.clusterStreamApps = getClusterToStreamAppsMap(clustersProperties);
this.webClient = webClient;
this.topologyParser = topologyParser;
}

public StreamApplicationsDTO getTopologyApplications(String clusterName) {
final var streamApplications = new StreamApplicationsDTO();
final var applicationIds = Optional.ofNullable(clusterStreamApps.get(clusterName))
.map(Map::keySet)
.map(ArrayList::new)
.orElseThrow(ClusterNotFoundException::new);
return streamApplications.applicationIds(applicationIds);
}

public Mono<ProcessorTopologyDTO> getStreamTopology(String clusterName, String applicationId) {
return Optional.ofNullable(clusterStreamApps.get(clusterName))
.map(apps -> apps.get(applicationId))
.map(this::getTopologyString)
.map(topologyMono ->
topologyMono.map(s -> parseTopologyString(s, clusterName, applicationId))
)
.orElseThrow(() -> new NotFoundException("Stream application not found"));
}

private ProcessorTopologyDTO parseTopologyString(String topologyString, String clusterName,
String applicationId) {
try {
return topologyParser.parse(topologyString);
} catch (InvalidStreamTopologyString e) {
log.error("cannot parse stream topology", e);
throw new StreamTopologyParsingException(String
.format("cannot parse stream topology <clusterName %s>, <applicationId %s>",
clusterName, applicationId));
}
}

private Mono<String> getTopologyString(String topologyUrl) {
return webClient.get()
.uri(topologyUrl)
.retrieve()
.bodyToMono(String.class)
.doOnError(log::error);
}

private Map<String, Map<String, String>> getClusterToStreamAppsMap(
ClustersProperties clustersProperties) {
return clustersProperties.getClusters().stream()
.map(cluster -> ImmutablePair.of(cluster.getName(), getAppToEndpointMap(cluster)))
.collect(Collectors.toMap(ImmutablePair::getKey, ImmutablePair::getValue));
}

private Map<String, String> getAppToEndpointMap(ClustersProperties.Cluster cluster) {
return cluster.getStreamApplications().stream()
.collect(Collectors.toMap(
StreamApplication::getApplicationId,
StreamApplication::getTopologyUrl)
);
}
}
Loading