From 432c027c0b48469e903b290c87e58febeab7a25c Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 18 Nov 2022 12:31:45 +0400 Subject: [PATCH 1/4] wip --- .../kafka/ui/controller/TopicsController.java | 46 +++++ .../kafka/ui/service/ReactiveAdminClient.java | 5 + .../service/reassign/ReassignementsStore.java | 4 + .../ui/service/reassign/ReassignmentPlan.java | 11 ++ .../service/reassign/ReassignmentPlanner.java | 151 ++++++++++++++++ .../service/reassign/ReassignmentService.java | 78 +++++++++ .../reassign/ReassignmentJsonDtoTest.java | 55 ++++++ .../main/resources/swagger/kafka-ui-api.yaml | 164 ++++++++++++++++++ 8 files changed, 514 insertions(+) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignementsStore.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlan.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlanner.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentService.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentJsonDtoTest.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index ccfe898fdcb..0bea32a875c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -4,10 +4,13 @@ import com.provectus.kafka.ui.api.TopicsApi; import com.provectus.kafka.ui.mapper.ClusterMapper; +import com.provectus.kafka.ui.model.GeneratePartitionsReassignmentCommandDTO; +import com.provectus.kafka.ui.model.InProgressReassignmentDTO; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; import com.provectus.kafka.ui.model.PartitionsIncreaseDTO; import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO; +import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO; import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO; import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO; import com.provectus.kafka.ui.model.SortOrderDTO; @@ -21,8 +24,10 @@ import com.provectus.kafka.ui.model.TopicsResponseDTO; import com.provectus.kafka.ui.service.TopicsService; import com.provectus.kafka.ui.service.analyze.TopicAnalysisService; +import com.provectus.kafka.ui.service.reassign.ReassignmentService; import java.util.Comparator; import java.util.List; +import java.util.stream.Collectors; import javax.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -43,6 +48,7 @@ public class TopicsController extends AbstractController implements TopicsApi { private final TopicsService topicsService; private final TopicAnalysisService topicAnalysisService; + private final ReassignmentService reassignmentService; private final ClusterMapper clusterMapper; @Override @@ -209,4 +215,44 @@ public Mono> getTopicAnalysis(String clusterNam .orElseGet(() -> ResponseEntity.notFound().build()) ); } + + + @Override + public Mono> generatePartitionAssignment(String clusterName, + Mono generatePartitionsReassignmentCommandDTO, + ServerWebExchange exchange) { + return generatePartitionsReassignmentCommandDTO + .flatMap(generateDto -> + reassignmentService.generate( + getCluster(clusterName), + generateDto.getTopics().stream().map(t -> t.getTopic()).collect(Collectors.toSet()), + generateDto.getBrokerIds())) + .map(ResponseEntity::ok); + } + + @Override + public Mono> getCurrentPartitionAssignment(String clusterName, + Mono generatePartitionsReassignmentCommandDTO, + ServerWebExchange exchange) { + return generatePartitionsReassignmentCommandDTO + .flatMap(generateDto -> + reassignmentService.getCurrentAssignment( + getCluster(clusterName), + generateDto.getTopics().stream().map(t -> t.getTopic()).collect(Collectors.toSet()))) + .map(ResponseEntity::ok); + } + + @Override + public Mono> executePartitionAssignment(String clusterName, + Mono reassignPartitionsCommandDTO, + ServerWebExchange exchange) { + return null; + } + + @Override + public Mono> getInProgressAssignments(String clusterName, + ServerWebExchange exchange) { + return reassignmentService.getInProgressAssignments(getCluster(clusterName)) + .map(ResponseEntity::ok); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 2504473b171..b01e9367283 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -48,6 +48,7 @@ import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.PartitionReassignment; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -357,6 +358,10 @@ public Mono alterPartitionReassignments( return toMono(client.alterPartitionReassignments(reassignments).all()); } + public Mono> listPartitionReassignments() { + return toMono(client.listPartitionReassignments().reassignments()); + } + public Mono createPartitions(Map newPartitionsMap) { return toMono(client.createPartitions(newPartitionsMap).all()); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignementsStore.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignementsStore.java new file mode 100644 index 00000000000..84b9769f668 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignementsStore.java @@ -0,0 +1,4 @@ +package com.provectus.kafka.ui.service.reassign; + +public class ReassignementsStore { +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlan.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlan.java new file mode 100644 index 00000000000..031a891ee02 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlan.java @@ -0,0 +1,11 @@ +package com.provectus.kafka.ui.service.reassign; + +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.admin.NewPartitionReassignment; +import org.apache.kafka.common.TopicPartition; + +public record ReassignmentPlan(Map> reassignments) { + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlanner.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlanner.java new file mode 100644 index 00000000000..8674ea58e34 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlanner.java @@ -0,0 +1,151 @@ +package com.provectus.kafka.ui.service.reassign; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.model.PartitionReassignmentDTO; +import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.random.RandomGenerator; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +@Slf4j +@RequiredArgsConstructor +public class ReassignmentPlanner { + + record BrokerMetadata(int id, Optional rack) { + } + + private final ReactiveAdminClient adminClient; + + public Mono generatePartitionReassignment(Set topics, + List brokerIds, + boolean rackAware) { + return Mono.zip(currentAssignments(adminClient, topics), brokerMetadata(brokerIds)).map(t -> + createSuggestedReassignment( + calculateAssignment(t.getT1(), t.getT2(), rackAware))); + } + + private static ReassignPartitionsCommandDTO createSuggestedReassignment( + Map> assignment) { + var dto = new ReassignPartitionsCommandDTO().version(1); + assignment.forEach((tp, replicas) -> + dto.addPartitionsItem( + new PartitionReassignmentDTO() + .topic(tp.topic()) + .partition(tp.partition()) + .replicas(replicas) + .logDirs(replicas.stream().map(r -> "any").toList()))); + return dto; + } + + // [ topic -> [tp -> list of replicas] ] + public static Mono>>> currentAssignments(ReactiveAdminClient ac, Set topics) { + return ac.describeTopics(topics) + .map(topicToDescriptionMap -> + topicToDescriptionMap.entrySet().stream() + .map(e -> + Tuples.of( + e.getKey(), + e.getValue().partitions().stream() + .map(p -> + Tuples.of( + new TopicPartition(e.getKey(), p.partition()), + p.replicas().stream().map(Node::id).toList() + )).collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)) + )) + .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)) + ); + } + + private Mono> brokerMetadata(List brokerIds) { + return adminClient.describeCluster() + .map(description -> description.getNodes().stream() + .filter(n -> brokerIds.contains(n.id())) + .map(n -> new BrokerMetadata(n.id(), Optional.ofNullable(n.rack()))) + .toList()); + } + + @VisibleForTesting + static Map> calculateAssignment( + Map>> currentAssignments, + List brokerMetadata, + boolean rackAware) { + if (rackAware && brokerMetadata.stream().anyMatch(m -> m.rack().isEmpty())) { + throw new ValidationException("Not all brokers have rack information for replica rack aware assignment"); + } + return rackAware + ? calculateAssignmentRackAware(currentAssignments, brokerMetadata) + : calculateAssignmentRackUnaware(currentAssignments, brokerMetadata); + } + + private static Map> calculateAssignmentRackAware( + Map>> currentAssignments, + List brokerMetadata) { + log.warn("Rack-aware assignment calculation is not implemented yet, falling back to usual calculation"); + return calculateAssignmentRackUnaware(currentAssignments, brokerMetadata); + } + + private static Map> calculateAssignmentRackUnaware( + Map>> currentAssignments, + List brokerMetadata) { + Map> result = new LinkedHashMap<>(); + currentAssignments.forEach((topic, currentAssignment) -> { + result.putAll( + assignReplicasToBrokersRackUnaware( + topic, + currentAssignment.size(), + currentAssignment.entrySet().iterator().next().getValue().size(), + brokerMetadata.stream().map(BrokerMetadata::id).collect(Collectors.toList()), + ThreadLocalRandom.current() + ) + ); + }); + return result; + } + + static Map> assignReplicasToBrokersRackUnaware( + String topic, + int nPartitions, + int replicationFactor, + List brokerList, + RandomGenerator rand) { + var result = new LinkedHashMap>(); + int startIndex = rand.nextInt(brokerList.size()); + int currentPartitionId = 0; + int nextReplicaShift = rand.nextInt(brokerList.size()); + for (int i = 0; i < nPartitions; i++) { + if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0)) { + nextReplicaShift += 1; + } + int firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size(); + var replicaBuffer = Lists.newArrayList(brokerList.get(firstReplicaIndex)); + for (int j = 0; j < replicationFactor - 1; j++) { + replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size()))); + } + result.put(new TopicPartition(topic, currentPartitionId), replicaBuffer); + currentPartitionId += 1; + } + return result; + } + + private static int replicaIndex(int firstReplicaIndex, int secondReplicaShift, int replicaIndex, int nBrokers) { + var shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1); + return (firstReplicaIndex + shift) % nBrokers; + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentService.java new file mode 100644 index 00000000000..445caa97e89 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentService.java @@ -0,0 +1,78 @@ +package com.provectus.kafka.ui.service.reassign; + +import com.provectus.kafka.ui.model.InProgressPartitionReassignmentDTO; +import com.provectus.kafka.ui.model.InProgressReassignmentDTO; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.PartitionReassignmentDTO; +import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO; +import com.provectus.kafka.ui.service.AdminClientService; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.util.List; +import java.util.Map; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.admin.PartitionReassignment; +import org.apache.kafka.common.TopicPartition; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +@Service +@RequiredArgsConstructor +public class ReassignmentService { + + private final AdminClientService adminClientService; + + public Mono generate(KafkaCluster cluster, + Set topics, + List brokerIds) { + return adminClientService.get(cluster) + .map(ReassignmentPlanner::new) + .flatMap(planner -> planner.generatePartitionReassignment(topics, brokerIds, false)); + } + + + public Mono getCurrentAssignment(KafkaCluster cluster, + Set topics) { + return adminClientService.get(cluster) + .flatMap(ac -> ReassignmentPlanner.currentAssignments(ac, topics)) + .map(this::map); + } + + public Mono getInProgressAssignments(KafkaCluster cluster) { + return adminClientService.get(cluster) + .flatMap(ReactiveAdminClient::listPartitionReassignments) + .map(this::mapInProgressReassignments); + } + + private InProgressReassignmentDTO mapInProgressReassignments(Map reassignments) { + return new InProgressReassignmentDTO() + .partitions( + reassignments.entrySet().stream() + .map(e -> new InProgressPartitionReassignmentDTO() + .topic(e.getKey().topic()) + .partition(e.getKey().partition()) + .currentReplicas(e.getValue().replicas()) + .addingReplicas(e.getValue().addingReplicas()) + .removingReplicas(e.getValue().removingReplicas()) + ) + .toList() + ); + } + + private ReassignPartitionsCommandDTO map(Map>> assignment) { + return new ReassignPartitionsCommandDTO() + .version(1) + .partitions( + assignment.values().stream() + .flatMap(m -> m.entrySet().stream()) + .map(p -> new PartitionReassignmentDTO() + .topic(p.getKey().topic()) + .partition(p.getKey().partition()) + .replicas(p.getValue()) + .logDirs(p.getValue().stream().map(r -> "any").toList()) + ) + .toList() + ); + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentJsonDtoTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentJsonDtoTest.java new file mode 100644 index 00000000000..9a490416472 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentJsonDtoTest.java @@ -0,0 +1,55 @@ +package com.provectus.kafka.ui.service.reassign; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO; +import java.util.List; +import org.junit.jupiter.api.Test; + +class ReassignmentJsonDtoTest { + + @Test + void canBeCreatedFromJsonString() { + var parsed = ReassignmentJsonDto.fromJson( + "{" + + " \"version\": 1, " + + " \"partitions\":" + + " [" + + " {" + + " \"topic\": \"my-topic\"," + + " \"partition\": 0, " + + " \"replicas\":" + + " [ " + + " 0, " + + " 1, " + + " 2 " + + " ], " + + " \"log_dirs\": " + + " [ " + + " \"any\", " + + " \"/user/share/kafka/p0\"," + + " \"any\"" + + " ]" + + " }" + + " ]" + + "}" + ); + assertThat(parsed).isEqualTo( + ReassignPartitionsCommandDTO.builder() + .version(1) + .partitions( + List.of( + ReassignmentJsonDto.PartitionAssignmentDto.builder() + .topic("my-topic") + .partition(0) + .replicas(List.of(0, 1, 2)) + .logDirs(List.of("any", "/user/share/kafka/p0", "any")) + .build() + ) + ) + .build() + ); + } + + +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 1034fc32025..5a5e969d33e 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1729,6 +1729,94 @@ paths: $ref: '#/components/schemas/PartitionsIncreaseResponse' 404: description: Not found + + /api/clusters/{clusterName}/partitionsreaassignments/generate: + post: + tags: + - Topics + operationId: generatePartitionAssignment + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/GeneratePartitionsReassignmentCommand' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ReassignPartitionsCommand' + + /api/clusters/{clusterName}/partitionsreaassignments/current: + post: + tags: + - Topics + operationId: getCurrentPartitionAssignment + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/GeneratePartitionsReassignmentCommand' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ReassignPartitionsCommand' + + /api/clusters/{clusterName}/partitionsreaassignments/execute: + post: + tags: + - Topics + operationId: executePartitionAssignment + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ReassignPartitionsCommand' + responses: + 200: + description: OK + + /api/clusters/{clusterName}/partitionsreaassignments/inprogress: + get: + tags: + - Topics + operationId: getInProgressAssignments + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/InProgressReassignment' + /api/info/timestampformat: get: tags: @@ -3149,3 +3237,79 @@ components: - COMPACT - COMPACT_DELETE - UNKNOWN + + ReassignPartitionsCommand: + type: object + properties: + version: + type: integer + format: int32 + partitions: + type: array + items: + $ref: "#/components/schemas/PartitionReassignment" + + PartitionReassignment: + type: object + properties: + topic: + type: string + partition: + type: integer + replicas: + type: array + items: + type: integer + log_dirs: + type: array + items: + type: string + + GeneratePartitionsReassignmentCommand: + type: object + properties: + version: + type: integer + format: int32 + broker_ids: + type: array + items: + type: integer + format: int32 + topics: + type: array + items: + type: object + properties: + topic: + type: string + + InProgressReassignment: + type: object + properties: + partitions: + type: array + items: + $ref: "#/components/schemas/InProgressPartitionReassignment" + + InProgressPartitionReassignment: + type: object + properties: + topic: + type: string + partition: + type: integer + format: int32 + currentReplicas: + type: array + items: + type: integer + format: int32 + addingReplicas: + items: + type: integer + format: int32 + removingReplicas: + items: + type: integer + format: int32 From 14bd3086e905b53dde13b1bcb81c759feb2c9931 Mon Sep 17 00:00:00 2001 From: iliax Date: Wed, 23 Nov 2022 23:33:26 +0400 Subject: [PATCH 2/4] Partitions-reassignment endpoints implemented --- .../kafka/ui/controller/TopicsController.java | 40 +++- .../service/reassign/ReassignementsStore.java | 4 - .../reassign/ReassignmentOperations.java | 188 +++++++++++++++ .../ui/service/reassign/ReassignmentPlan.java | 11 - .../service/reassign/ReassignmentPlanner.java | 151 ------------ .../service/reassign/ReassignmentService.java | 80 +++++-- .../service/TopicsServicePaginationTest.java | 3 +- .../reassign/ReassignmentJsonDtoTest.java | 55 ----- .../reassign/ReassignmentOperationsTest.java | 224 ++++++++++++++++++ .../main/resources/swagger/kafka-ui-api.yaml | 42 +++- 10 files changed, 544 insertions(+), 254 deletions(-) delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignementsStore.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentOperations.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlan.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlanner.java delete mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentJsonDtoTest.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentOperationsTest.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index 0bea32a875c..d63d248db3f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -8,6 +8,7 @@ import com.provectus.kafka.ui.model.InProgressReassignmentDTO; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; +import com.provectus.kafka.ui.model.PartitionReassignmentCancellationDTO; import com.provectus.kafka.ui.model.PartitionsIncreaseDTO; import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO; import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO; @@ -32,6 +33,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.TopicPartition; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; @@ -218,10 +220,11 @@ public Mono> getTopicAnalysis(String clusterNam @Override - public Mono> generatePartitionAssignment(String clusterName, - Mono generatePartitionsReassignmentCommandDTO, - ServerWebExchange exchange) { - return generatePartitionsReassignmentCommandDTO + public Mono> + generatePartitionAssignment(String clusterName, + Mono reassignCmdDto, + ServerWebExchange exchange) { + return reassignCmdDto .flatMap(generateDto -> reassignmentService.generate( getCluster(clusterName), @@ -232,21 +235,24 @@ public Mono> generatePartitionAssig @Override public Mono> getCurrentPartitionAssignment(String clusterName, - Mono generatePartitionsReassignmentCommandDTO, + Flux topicsList, ServerWebExchange exchange) { - return generatePartitionsReassignmentCommandDTO - .flatMap(generateDto -> + return topicsList + .collect(Collectors.toSet()) + .flatMap(topics -> reassignmentService.getCurrentAssignment( getCluster(clusterName), - generateDto.getTopics().stream().map(t -> t.getTopic()).collect(Collectors.toSet()))) + topics)) .map(ResponseEntity::ok); } @Override public Mono> executePartitionAssignment(String clusterName, - Mono reassignPartitionsCommandDTO, + Mono cmdDto, ServerWebExchange exchange) { - return null; + return cmdDto + .flatMap(cmd -> reassignmentService.executeReassignment(getCluster(clusterName), cmd)) + .thenReturn(ResponseEntity.ok().build()); } @Override @@ -255,4 +261,18 @@ public Mono> getInProgressAssignments( return reassignmentService.getInProgressAssignments(getCluster(clusterName)) .map(ResponseEntity::ok); } + + @Override + public Mono> cancelPartitionAssignment(String clusterName, + Mono cancelDto, + ServerWebExchange exchange) { + return cancelDto + .flatMap(dto -> + reassignmentService.cancelReassignment( + getCluster(clusterName), + dto.getPartitions().stream() + .map(p -> new TopicPartition(p.getTopic(), p.getPartition())) + .collect(Collectors.toSet()))) + .thenReturn(ResponseEntity.ok().build()); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignementsStore.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignementsStore.java deleted file mode 100644 index 84b9769f668..00000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignementsStore.java +++ /dev/null @@ -1,4 +0,0 @@ -package com.provectus.kafka.ui.service.reassign; - -public class ReassignementsStore { -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentOperations.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentOperations.java new file mode 100644 index 00000000000..d4b5a588fc8 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentOperations.java @@ -0,0 +1,188 @@ +package com.provectus.kafka.ui.service.reassign; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.random.RandomGenerator; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.NewPartitionReassignment; +import org.apache.kafka.clients.admin.PartitionReassignment; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +@Slf4j +@RequiredArgsConstructor +class ReassignmentOperations { + + public static final int SUPPORTED_CMD_VERSION = 1; + public static final String LOG_DIR_NOT_SET_STRING = "any"; + + private record BrokerMetadata(int id, Optional rack) {} + + private final ReactiveAdminClient adminClient; + + Mono>> generatePartitionReassignment(Set topics, + List brokerIds, + boolean rackAware) { + return Mono.zip(getCurrentAssignment(topics), getBrokersMetadata(brokerIds)) + .map(t -> calculateAssignment(t.getT1(), t.getT2(), rackAware)); + } + + // [ topic -> [partition -> list of replica ids] ] + Mono>> getCurrentAssignment(Set topics) { + return adminClient.describeTopics(topics) + .map(topicToDescriptionMap -> topicToDescriptionMap.entrySet().stream() + .flatMap((Map.Entry e) -> + e.getValue().partitions().stream() + .map(p -> + Tuples.of( + new TopicPartition(e.getKey(), p.partition()), + p.replicas().stream().map(Node::id).toList()))) + .collect(toMap(Tuple2::getT1, Tuple2::getT2))); + } + + Mono validateAndExecute(List>> reassignment, Runnable preExecute) { + return validateCmd(reassignment) + .doOnNext(r -> preExecute.run()) + .flatMap(adminClient::alterPartitionReassignments); + } + + private Mono>> validateCmd( + List>> reassignment) { + if (reassignment.isEmpty()) { + throw new ValidationException("Partition reassignment list cannot be empty"); + } + if (reassignment.stream().map(Tuple2::getT2).anyMatch(List::isEmpty)) { + throw new ValidationException("Partition replica list cannot be empty"); + } + if (reassignment.stream().map(Tuple2::getT1).distinct().count() < reassignment.size()) { + throw new ValidationException("Partition reassignment contains duplicate topic partitions"); + } + return adminClient.describeCluster() + .doOnNext(description -> { + var knownIds = description.getNodes().stream().map(Node::id).toList(); + var unknownIds = reassignment.stream() + .flatMap(t -> t.getT2().stream()) + .filter(id -> !knownIds.contains(id)) + .toList(); + if (!unknownIds.isEmpty()) { + throw new ValidationException("Unknown broker ids: " + unknownIds); + } + }) + .thenReturn(reassignment.stream() + .collect(toMap(Tuple2::getT1, t -> Optional.of(new NewPartitionReassignment(t.getT2()))))); + } + + private Mono> getBrokersMetadata(List brokerIds) { + return adminClient.describeCluster() + .map(description -> description.getNodes().stream() + .filter(n -> brokerIds.contains(n.id())) + .map(n -> new BrokerMetadata(n.id(), Optional.ofNullable(n.rack()))) + .toList()); + } + + private static Map> calculateAssignment( + Map> currentAssignments, + List brokerMetadata, + boolean rackAware) { + Map>> perTopic = currentAssignments.entrySet().stream() + .collect(groupingBy(e -> e.getKey().topic(), toMap(Map.Entry::getKey, Map.Entry::getValue))); + return rackAware + ? calculateAssignmentRackAware(perTopic, brokerMetadata) + : calculateAssignmentRackUnaware(perTopic, brokerMetadata); + } + + private static Map> calculateAssignmentRackAware( + Map>> currentAssignments, + List brokerMetadata) { + if (brokerMetadata.stream().anyMatch(m -> m.rack().isEmpty())) { + throw new ValidationException("Not all brokers have rack information for replica rack aware assignment"); + } + log.warn("Rack-aware assignment calculation is not implemented yet, falling back to usual calculation"); + return calculateAssignmentRackUnaware(currentAssignments, brokerMetadata); + } + + private static Map> calculateAssignmentRackUnaware( + Map>> currentAssignments, + List brokerMetadata) { + Map> result = new LinkedHashMap<>(); + currentAssignments.forEach((topic, currentAssignment) -> { + result.putAll( + assignReplicasToBrokersRackUnaware( + topic, + currentAssignment.size(), + currentAssignment.entrySet().iterator().next().getValue().size(), + brokerMetadata.stream().map(BrokerMetadata::id).collect(toList()), + ThreadLocalRandom.current() + ) + ); + }); + return result; + } + + // implementation copied from https://github.com/apache/kafka/blob/1874f2388cffa7a1e866cbe4aff8b92c9d953b41/core/src/main/scala/kafka/admin/AdminUtils.scala#L125 + @VisibleForTesting + static Map> assignReplicasToBrokersRackUnaware( + String topic, + int numPartitions, + int replicationFactor, + List brokerList, + RandomGenerator rand) { + var result = new LinkedHashMap>(); + int startIndex = rand.nextInt(brokerList.size()); + int currentPartitionId = 0; + int nextReplicaShift = startIndex; + for (int i = 0; i < numPartitions; i++) { + if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0)) { + nextReplicaShift += 1; + } + int firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size(); + var replicaBuffer = Lists.newArrayList(brokerList.get(firstReplicaIndex)); + for (int j = 0; j < replicationFactor - 1; j++) { + replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size()))); + } + result.put(new TopicPartition(topic, currentPartitionId), replicaBuffer); + currentPartitionId += 1; + } + return result; + } + + private static int replicaIndex(int firstReplicaIndex, int secondReplicaShift, int replicaIndex, int numBrokers) { + var shift = 1 + (secondReplicaShift + replicaIndex) % (numBrokers - 1); + return (firstReplicaIndex + shift) % numBrokers; + } + + Mono cancelReassignment(Set partitions) { + return adminClient.listPartitionReassignments() + .map(reassignments -> reassignments.entrySet().stream() + .filter(e -> partitions.contains(e.getKey())) + .filter(e -> { + PartitionReassignment reassignment = e.getValue(); + return !reassignment.addingReplicas().isEmpty() + || !reassignment.removingReplicas().isEmpty(); + }) + .map(Map.Entry::getKey) + .collect(Collectors.toSet())) + .flatMap(tps -> tps.isEmpty() + ? Mono.empty() + : adminClient.alterPartitionReassignments(tps.stream().collect(toMap(p -> p, p -> Optional.empty())))); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlan.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlan.java deleted file mode 100644 index 031a891ee02..00000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlan.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.provectus.kafka.ui.service.reassign; - -import com.provectus.kafka.ui.service.ReactiveAdminClient; -import java.util.Map; -import java.util.Optional; -import org.apache.kafka.clients.admin.NewPartitionReassignment; -import org.apache.kafka.common.TopicPartition; - -public record ReassignmentPlan(Map> reassignments) { - -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlanner.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlanner.java deleted file mode 100644 index 8674ea58e34..00000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlanner.java +++ /dev/null @@ -1,151 +0,0 @@ -package com.provectus.kafka.ui.service.reassign; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.provectus.kafka.ui.exception.ValidationException; -import com.provectus.kafka.ui.model.PartitionReassignmentDTO; -import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO; -import com.provectus.kafka.ui.service.ReactiveAdminClient; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ThreadLocalRandom; -import java.util.random.RandomGenerator; -import java.util.stream.Collectors; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.TopicPartition; -import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; - -@Slf4j -@RequiredArgsConstructor -public class ReassignmentPlanner { - - record BrokerMetadata(int id, Optional rack) { - } - - private final ReactiveAdminClient adminClient; - - public Mono generatePartitionReassignment(Set topics, - List brokerIds, - boolean rackAware) { - return Mono.zip(currentAssignments(adminClient, topics), brokerMetadata(brokerIds)).map(t -> - createSuggestedReassignment( - calculateAssignment(t.getT1(), t.getT2(), rackAware))); - } - - private static ReassignPartitionsCommandDTO createSuggestedReassignment( - Map> assignment) { - var dto = new ReassignPartitionsCommandDTO().version(1); - assignment.forEach((tp, replicas) -> - dto.addPartitionsItem( - new PartitionReassignmentDTO() - .topic(tp.topic()) - .partition(tp.partition()) - .replicas(replicas) - .logDirs(replicas.stream().map(r -> "any").toList()))); - return dto; - } - - // [ topic -> [tp -> list of replicas] ] - public static Mono>>> currentAssignments(ReactiveAdminClient ac, Set topics) { - return ac.describeTopics(topics) - .map(topicToDescriptionMap -> - topicToDescriptionMap.entrySet().stream() - .map(e -> - Tuples.of( - e.getKey(), - e.getValue().partitions().stream() - .map(p -> - Tuples.of( - new TopicPartition(e.getKey(), p.partition()), - p.replicas().stream().map(Node::id).toList() - )).collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)) - )) - .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)) - ); - } - - private Mono> brokerMetadata(List brokerIds) { - return adminClient.describeCluster() - .map(description -> description.getNodes().stream() - .filter(n -> brokerIds.contains(n.id())) - .map(n -> new BrokerMetadata(n.id(), Optional.ofNullable(n.rack()))) - .toList()); - } - - @VisibleForTesting - static Map> calculateAssignment( - Map>> currentAssignments, - List brokerMetadata, - boolean rackAware) { - if (rackAware && brokerMetadata.stream().anyMatch(m -> m.rack().isEmpty())) { - throw new ValidationException("Not all brokers have rack information for replica rack aware assignment"); - } - return rackAware - ? calculateAssignmentRackAware(currentAssignments, brokerMetadata) - : calculateAssignmentRackUnaware(currentAssignments, brokerMetadata); - } - - private static Map> calculateAssignmentRackAware( - Map>> currentAssignments, - List brokerMetadata) { - log.warn("Rack-aware assignment calculation is not implemented yet, falling back to usual calculation"); - return calculateAssignmentRackUnaware(currentAssignments, brokerMetadata); - } - - private static Map> calculateAssignmentRackUnaware( - Map>> currentAssignments, - List brokerMetadata) { - Map> result = new LinkedHashMap<>(); - currentAssignments.forEach((topic, currentAssignment) -> { - result.putAll( - assignReplicasToBrokersRackUnaware( - topic, - currentAssignment.size(), - currentAssignment.entrySet().iterator().next().getValue().size(), - brokerMetadata.stream().map(BrokerMetadata::id).collect(Collectors.toList()), - ThreadLocalRandom.current() - ) - ); - }); - return result; - } - - static Map> assignReplicasToBrokersRackUnaware( - String topic, - int nPartitions, - int replicationFactor, - List brokerList, - RandomGenerator rand) { - var result = new LinkedHashMap>(); - int startIndex = rand.nextInt(brokerList.size()); - int currentPartitionId = 0; - int nextReplicaShift = rand.nextInt(brokerList.size()); - for (int i = 0; i < nPartitions; i++) { - if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0)) { - nextReplicaShift += 1; - } - int firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size(); - var replicaBuffer = Lists.newArrayList(brokerList.get(firstReplicaIndex)); - for (int j = 0; j < replicationFactor - 1; j++) { - replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size()))); - } - result.put(new TopicPartition(topic, currentPartitionId), replicaBuffer); - currentPartitionId += 1; - } - return result; - } - - private static int replicaIndex(int firstReplicaIndex, int secondReplicaShift, int replicaIndex, int nBrokers) { - var shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1); - return (firstReplicaIndex + shift) % nBrokers; - } - -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentService.java index 445caa97e89..31fcd50b0b5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentService.java @@ -1,5 +1,10 @@ package com.provectus.kafka.ui.service.reassign; +import static com.provectus.kafka.ui.service.reassign.ReassignmentOperations.LOG_DIR_NOT_SET_STRING; +import static com.provectus.kafka.ui.service.reassign.ReassignmentOperations.SUPPORTED_CMD_VERSION; + +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.InProgressPartitionReassignmentDTO; import com.provectus.kafka.ui.model.InProgressReassignmentDTO; import com.provectus.kafka.ui.model.KafkaCluster; @@ -11,11 +16,16 @@ import java.util.Map; import java.util.Set; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.PartitionReassignment; import org.apache.kafka.common.TopicPartition; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; +@Slf4j @Service @RequiredArgsConstructor public class ReassignmentService { @@ -26,25 +36,55 @@ public Mono generate(KafkaCluster cluster, Set topics, List brokerIds) { return adminClientService.get(cluster) - .map(ReassignmentPlanner::new) - .flatMap(planner -> planner.generatePartitionReassignment(topics, brokerIds, false)); + .map(ReassignmentOperations::new) + .flatMap(planner -> planner.generatePartitionReassignment(topics, brokerIds, false)) + .map(ReassignmentService::mapToReassignmentDto); + } + + public Mono executeReassignment(KafkaCluster cluster, ReassignPartitionsCommandDTO cmd) { + return adminClientService.get(cluster) + .map(ReassignmentOperations::new) + .flatMap(ops -> { + if (!cmd.getPartitions().stream() + .flatMap(p -> p.getLogDirs().stream()) + .allMatch(logDir -> logDir.equalsIgnoreCase(LOG_DIR_NOT_SET_STRING))) { + return Mono.error(new ValidationException("Log dir altering is not supported")); + } + List>> reassignment = cmd.getPartitions().stream() + .map(p -> Tuples.of(new TopicPartition(p.getTopic(), p.getPartition()), p.getReplicas())) + .toList(); + return ops.validateAndExecute(reassignment, () -> logRequestedAssignment(cmd)); + }); } + @SneakyThrows + private void logRequestedAssignment(ReassignPartitionsCommandDTO cmd) { + log.info("Executing partitions reassignment: \n{}", + new JsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(cmd)); + } + + public Mono cancelReassignment(KafkaCluster cluster, Set partitions) { + return adminClientService.get(cluster) + .map(ReassignmentOperations::new) + .flatMap(ops -> ops.cancelReassignment(partitions)); + } public Mono getCurrentAssignment(KafkaCluster cluster, Set topics) { return adminClientService.get(cluster) - .flatMap(ac -> ReassignmentPlanner.currentAssignments(ac, topics)) - .map(this::map); + .map(ReassignmentOperations::new) + .flatMap(ops -> ops.getCurrentAssignment(topics)) + .map(ReassignmentService::mapToReassignmentDto); } public Mono getInProgressAssignments(KafkaCluster cluster) { return adminClientService.get(cluster) .flatMap(ReactiveAdminClient::listPartitionReassignments) - .map(this::mapInProgressReassignments); + .map(ReassignmentService::mapToInProgressReassignmentsDto); } - private InProgressReassignmentDTO mapInProgressReassignments(Map reassignments) { + private static InProgressReassignmentDTO mapToInProgressReassignmentsDto( + Map reassignments) { return new InProgressReassignmentDTO() .partitions( reassignments.entrySet().stream() @@ -59,20 +99,20 @@ private InProgressReassignmentDTO mapInProgressReassignments(Map>> assignment) { - return new ReassignPartitionsCommandDTO() - .version(1) - .partitions( - assignment.values().stream() - .flatMap(m -> m.entrySet().stream()) - .map(p -> new PartitionReassignmentDTO() - .topic(p.getKey().topic()) - .partition(p.getKey().partition()) - .replicas(p.getValue()) - .logDirs(p.getValue().stream().map(r -> "any").toList()) - ) - .toList() - ); + private static ReassignPartitionsCommandDTO mapToReassignmentDto( + Map> assignment) { + return new ReassignPartitionsCommandDTO() + .version(SUPPORTED_CMD_VERSION) + .partitions(assignment.entrySet().stream().map(ReassignmentService::mapPartitionAssignmentDto).toList()); + } + + private static PartitionReassignmentDTO mapPartitionAssignmentDto(Map.Entry> + partitionsAssignment) { + return new PartitionReassignmentDTO() + .topic(partitionsAssignment.getKey().topic()) + .partition(partitionsAssignment.getKey().partition()) + .replicas(partitionsAssignment.getValue()) + .logDirs(partitionsAssignment.getValue().stream().map(r -> LOG_DIR_NOT_SET_STRING).toList()); } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java index cc057db0b0d..f1f6ab2abdc 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java @@ -19,6 +19,7 @@ import com.provectus.kafka.ui.model.TopicColumnsToSortDTO; import com.provectus.kafka.ui.model.TopicDTO; import com.provectus.kafka.ui.service.analyze.TopicAnalysisService; +import com.provectus.kafka.ui.service.reassign.ReassignmentService; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -43,7 +44,7 @@ class TopicsServicePaginationTest { private final ClusterMapper clusterMapper = new ClusterMapperImpl(); private final TopicsController topicsController = new TopicsController( - topicsService, mock(TopicAnalysisService.class), clusterMapper); + topicsService, mock(TopicAnalysisService.class), mock(ReassignmentService.class), clusterMapper); private void init(Map topicsInCache) { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentJsonDtoTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentJsonDtoTest.java deleted file mode 100644 index 9a490416472..00000000000 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentJsonDtoTest.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.provectus.kafka.ui.service.reassign; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO; -import java.util.List; -import org.junit.jupiter.api.Test; - -class ReassignmentJsonDtoTest { - - @Test - void canBeCreatedFromJsonString() { - var parsed = ReassignmentJsonDto.fromJson( - "{" + - " \"version\": 1, " + - " \"partitions\":" + - " [" + - " {" + - " \"topic\": \"my-topic\"," + - " \"partition\": 0, " + - " \"replicas\":" + - " [ " + - " 0, " + - " 1, " + - " 2 " + - " ], " + - " \"log_dirs\": " + - " [ " + - " \"any\", " + - " \"/user/share/kafka/p0\"," + - " \"any\"" + - " ]" + - " }" + - " ]" + - "}" - ); - assertThat(parsed).isEqualTo( - ReassignPartitionsCommandDTO.builder() - .version(1) - .partitions( - List.of( - ReassignmentJsonDto.PartitionAssignmentDto.builder() - .topic("my-topic") - .partition(0) - .replicas(List.of(0, 1, 2)) - .logDirs(List.of("any", "/user/share/kafka/p0", "any")) - .build() - ) - ) - .build() - ); - } - - -} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentOperationsTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentOperationsTest.java new file mode 100644 index 00000000000..df09f7a6e45 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentOperationsTest.java @@ -0,0 +1,224 @@ +package com.provectus.kafka.ui.service.reassign; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toMap; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.Lists; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.random.RandomGenerator; +import java.util.stream.IntStream; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +class ReassignmentOperationsTest { + + private static final String CONFLUENTINC_VERSION = "7.3.0"; + + private static final int KAFKA_1_ID = 1; + private static final int KAFKA_2_ID = 2; + + private static final Network NETWORK = Network.newNetwork(); + private static GenericContainer ZK; + private static KafkaContainer KAFKA_1; + private static KafkaContainer KAFKA_2; + + private static AdminClient ADMIN_CLIENT; + + ReassignmentOperations ops = new ReassignmentOperations(ReactiveAdminClient.create(ADMIN_CLIENT).block()); + + @BeforeAll + static void init() { + ZK = new GenericContainer( + DockerImageName.parse("confluentinc/cp-zookeeper").withTag(CONFLUENTINC_VERSION)) + .withEnv("ZOOKEEPER_CLIENT_PORT", "2181") + .withNetworkAliases("zookeeper") + .withNetwork(NETWORK); + KAFKA_1 = new KafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENTINC_VERSION)) + .withNetwork(NETWORK) + .withEnv("KAFKA_BROKER_ID", KAFKA_1_ID + "") + .withExternalZookeeper("zookeeper:2181"); + KAFKA_2 = new KafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENTINC_VERSION)) + .withNetwork(NETWORK) + .withEnv("KAFKA_BROKER_ID", KAFKA_2_ID + "") + .withExternalZookeeper("zookeeper:2181"); + + ZK.start(); + KAFKA_1.start(); + KAFKA_2.start(); + + Properties p = new Properties(); + p.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_1.getBootstrapServers()); + ADMIN_CLIENT = AdminClient.create(p); + } + + @AfterAll + static void tearDown() { + ADMIN_CLIENT.close(); + KAFKA_1.stop(); + KAFKA_2.stop(); + ZK.stop(); + NETWORK.close(); + } + + @Test + void testGeneratePartitionReassignment() throws Exception { + String testTopic1 = "test_" + UUID.randomUUID(); + String testTopic2 = "test_" + UUID.randomUUID(); + ADMIN_CLIENT.createTopics( + List.of( + new NewTopic(testTopic1, 5, (short) 1), + new NewTopic(testTopic2, 3, (short) 2)) + ).all().get(); + + Map> assignment = + ops.generatePartitionReassignment( + Set.of(testTopic1, testTopic2), List.of(KAFKA_1_ID, KAFKA_2_ID), false).block(); + + var perTopicAssignments = assignment.entrySet().stream() + .collect(groupingBy(e -> e.getKey().topic(), toMap(Map.Entry::getKey, Map.Entry::getValue))); + + verifyAssignment(testTopic1, 5, 1, perTopicAssignments.get(testTopic1)); + verifyAssignment(testTopic2, 3, 2, perTopicAssignments.get(testTopic2)); + } + + @Test + void testGetCurrentAssignment() throws Exception { + String testTopic1 = "test_" + UUID.randomUUID(); + String testTopic2 = "test_" + UUID.randomUUID(); + ADMIN_CLIENT.createTopics( + List.of( + new NewTopic(testTopic1, 2, (short) 2), + new NewTopic(testTopic2, 2, (short) 2)) + ).all().get(); + + Map> currentAssignment = + ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block(); + + assertThat(currentAssignment.entrySet().stream()) + .hasSize(4) + .allSatisfy(e -> { + TopicPartition partition = e.getKey(); + List replicas = e.getValue(); + assertThat(partition.topic()).isIn(List.of(testTopic1, testTopic2)); + assertThat(replicas).hasSize(2).containsExactlyInAnyOrder(KAFKA_1_ID, KAFKA_2_ID); + }); + } + + @Test + void testValidateAndExecute() throws Exception { + String testTopic1 = "test_" + UUID.randomUUID(); + String testTopic2 = "test_" + UUID.randomUUID(); + ADMIN_CLIENT.createTopics( + List.of( + new NewTopic(testTopic1, 2, (short) 2), + new NewTopic(testTopic2, 2, (short) 2)) + ).all().get(); + + Map> currentAssignment = + ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block(); + + Map> desiredAssignment = currentAssignment.entrySet().stream() + .map(e -> Tuples.of(e.getKey(), Lists.reverse(e.getValue()))) //reversing replicas list + .collect(toMap(Tuple2::getT1, Tuple2::getT2)); + + ops.validateAndExecute( + desiredAssignment.entrySet().stream().map(e -> Tuples.of(e.getKey(), e.getValue())).toList(), () -> {}).block(); + + Awaitility.await() + .pollInSameThread() + .atMost(Duration.ofSeconds(10)) + .until(() -> ADMIN_CLIENT.listPartitionReassignments().reassignments().get().isEmpty()); + + Map> actualAssignment = + ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block(); + + assertThat(actualAssignment).containsExactlyInAnyOrderEntriesOf(desiredAssignment); + } + + //test case copied from https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala#L198 + @Test + void testAssignReplicasToBrokersRackUnaware() { + RandomGenerator rand = mock(RandomGenerator.class); + when(rand.nextInt(anyInt())).thenReturn(0); + + var assignment = ReassignmentOperations.assignReplicasToBrokersRackUnaware( + "test", + 10, + 3, + List.of(0, 1, 2, 3, 4), + rand + ); + assertThat(assignment) + .containsExactlyInAnyOrderEntriesOf( + Map.of( + new TopicPartition("test", 0), List.of(0, 1, 2), + new TopicPartition("test", 1), List.of(1, 2, 3), + new TopicPartition("test", 2), List.of(2, 3, 4), + new TopicPartition("test", 3), List.of(3, 4, 0), + new TopicPartition("test", 4), List.of(4, 0, 1), + new TopicPartition("test", 5), List.of(0, 2, 3), + new TopicPartition("test", 6), List.of(1, 3, 4), + new TopicPartition("test", 7), List.of(2, 4, 0), + new TopicPartition("test", 8), List.of(3, 0, 1), + new TopicPartition("test", 9), List.of(4, 1, 2) + ) + ); + } + + @ParameterizedTest + @CsvSource({ + "10, 3, 1", + "10, 3, 3", + "1, 10, 1", + "1, 10, 10", + }) + void testAssignReplicasToBrokersRackUnawareWithRealRandom(int partitions, int brokersCnt, int replicationF) { + var assignment = ReassignmentOperations.assignReplicasToBrokersRackUnaware( + "test", + partitions, + replicationF, + IntStream.range(0, brokersCnt).boxed().toList(), + ThreadLocalRandom.current() + ); + verifyAssignment("test", partitions, replicationF, assignment); + } + + private void verifyAssignment(String topic, int numParts, int replicationFactor, + Map> assignmentToCheck) { + assertThat(assignmentToCheck.keySet()) + .containsExactlyInAnyOrderElementsOf( + IntStream.range(0, numParts).mapToObj(i -> new TopicPartition(topic, i)).toList()); + + assertThat(assignmentToCheck.values().stream()) + .allMatch(replicas -> + replicas.stream().distinct().count() == replicas.size() && replicas.size() == replicationFactor); + } + +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 5a5e969d33e..bb697d53f12 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1764,12 +1764,14 @@ paths: in: path required: true schema: - type: string + type: string requestBody: content: application/json: schema: - $ref: '#/components/schemas/GeneratePartitionsReassignmentCommand' + type: array + items: + type: string responses: 200: description: OK @@ -1798,6 +1800,26 @@ paths: 200: description: OK + /api/clusters/{clusterName}/partitionsreaassignments/cancel: + delete: + tags: + - Topics + operationId: cancelPartitionAssignment + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/PartitionReassignmentCancellation' + responses: + 200: + description: OK + /api/clusters/{clusterName}/partitionsreaassignments/inprogress: get: tags: @@ -3240,6 +3262,7 @@ components: ReassignPartitionsCommand: type: object + description: "NOTE! This format used by kafka-reassign-partitions.sh command-line utility and should not be changed" properties: version: type: integer @@ -3251,6 +3274,7 @@ components: PartitionReassignment: type: object + description: "NOTE! This format used by kafka-reassign-partitions.sh command-line utility and should not be changed" properties: topic: type: string @@ -3313,3 +3337,17 @@ components: items: type: integer format: int32 + + PartitionReassignmentCancellation: + type: object + properties: + partitions: + type: array + items: + type: object + properties: + topic: + type: string + partition: + type: integer + format: int32 From 4cf8f7a7db2e1c57bfae5bbe371a85d55151158c Mon Sep 17 00:00:00 2001 From: iliax Date: Wed, 23 Nov 2022 23:51:20 +0400 Subject: [PATCH 3/4] minot fix --- .../service/reassign/ReassignmentOperationsTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentOperationsTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentOperationsTest.java index df09f7a6e45..a8e3a2a566e 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentOperationsTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentOperationsTest.java @@ -154,12 +154,12 @@ void testValidateAndExecute() throws Exception { Awaitility.await() .pollInSameThread() .atMost(Duration.ofSeconds(10)) - .until(() -> ADMIN_CLIENT.listPartitionReassignments().reassignments().get().isEmpty()); + .untilAsserted(() -> { + Map> actualAssignment = + ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block(); - Map> actualAssignment = - ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block(); - - assertThat(actualAssignment).containsExactlyInAnyOrderEntriesOf(desiredAssignment); + assertThat(actualAssignment).containsExactlyInAnyOrderEntriesOf(desiredAssignment); + }); } //test case copied from https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala#L198 From c1e13ef0f8ea72a80c9d32dbbce68d3d9ee364ee Mon Sep 17 00:00:00 2001 From: iliax Date: Mon, 19 Dec 2022 23:15:13 +0400 Subject: [PATCH 4/4] endpoints improvement --- .../kafka/ui/controller/TopicsController.java | 16 +++++------ .../main/resources/swagger/kafka-ui-api.yaml | 27 +++++++++---------- 2 files changed, 18 insertions(+), 25 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index d63d248db3f..c663ec84ad4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -27,6 +27,7 @@ import com.provectus.kafka.ui.service.analyze.TopicAnalysisService; import com.provectus.kafka.ui.service.reassign.ReassignmentService; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.stream.Collectors; import javax.validation.Valid; @@ -235,14 +236,9 @@ public Mono> getTopicAnalysis(String clusterNam @Override public Mono> getCurrentPartitionAssignment(String clusterName, - Flux topicsList, + List topics, ServerWebExchange exchange) { - return topicsList - .collect(Collectors.toSet()) - .flatMap(topics -> - reassignmentService.getCurrentAssignment( - getCluster(clusterName), - topics)) + return reassignmentService.getCurrentAssignment(getCluster(clusterName), new HashSet<>(topics)) .map(ResponseEntity::ok); } @@ -263,9 +259,9 @@ public Mono> getInProgressAssignments( } @Override - public Mono> cancelPartitionAssignment(String clusterName, - Mono cancelDto, - ServerWebExchange exchange) { + public Mono> cancelRunningPartitionAssignment(String clusterName, + Mono cancelDto, + ServerWebExchange exchange) { return cancelDto .flatMap(dto -> reassignmentService.cancelReassignment( diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index bb697d53f12..489757f35a0 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1730,7 +1730,7 @@ paths: 404: description: Not found - /api/clusters/{clusterName}/partitionsreaassignments/generate: + /api/clusters/{clusterName}/partitionsreaassignments/suggestions: post: tags: - Topics @@ -1755,7 +1755,7 @@ paths: $ref: '#/components/schemas/ReassignPartitionsCommand' /api/clusters/{clusterName}/partitionsreaassignments/current: - post: + get: tags: - Topics operationId: getCurrentPartitionAssignment @@ -1765,13 +1765,14 @@ paths: required: true schema: type: string - requestBody: - content: - application/json: - schema: - type: array - items: - type: string + - name: topics + required: true + in: query + description: topic names for which assignments should be returned + schema: + type: array + items: + type: string responses: 200: description: OK @@ -1780,7 +1781,7 @@ paths: schema: $ref: '#/components/schemas/ReassignPartitionsCommand' - /api/clusters/{clusterName}/partitionsreaassignments/execute: + /api/clusters/{clusterName}/partitionsreaassignments/running: post: tags: - Topics @@ -1799,12 +1800,10 @@ paths: responses: 200: description: OK - - /api/clusters/{clusterName}/partitionsreaassignments/cancel: delete: tags: - Topics - operationId: cancelPartitionAssignment + operationId: cancelRunningPartitionAssignment parameters: - name: clusterName in: path @@ -1819,8 +1818,6 @@ paths: responses: 200: description: OK - - /api/clusters/{clusterName}/partitionsreaassignments/inprogress: get: tags: - Topics