diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index 04960cda69d..5ec0fbc642e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -123,9 +123,11 @@ public Mono> deleteTopic( .operationName("deleteTopic") .build(); - return accessControlService.validateAccess(context).then( - topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok) - ).doOnEach(sig -> auditService.audit(context, sig)); + return accessControlService.validateAccess(context) + .then( + topicsService.deleteTopic(getCluster(clusterName), topicName) + .thenReturn(ResponseEntity.ok().build()) + ).doOnEach(sig -> auditService.audit(context, sig)); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaSrMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaSrMapper.java index ea3435d4aa1..9a5f68cbd12 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaSrMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaSrMapper.java @@ -3,18 +3,21 @@ import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO; import com.provectus.kafka.ui.model.CompatibilityLevelDTO; import com.provectus.kafka.ui.model.NewSchemaSubjectDTO; +import com.provectus.kafka.ui.model.SchemaReferenceDTO; import com.provectus.kafka.ui.model.SchemaSubjectDTO; import com.provectus.kafka.ui.model.SchemaTypeDTO; import com.provectus.kafka.ui.service.SchemaRegistryService; import com.provectus.kafka.ui.sr.model.Compatibility; import com.provectus.kafka.ui.sr.model.CompatibilityCheckResponse; import com.provectus.kafka.ui.sr.model.NewSubject; +import com.provectus.kafka.ui.sr.model.SchemaReference; import com.provectus.kafka.ui.sr.model.SchemaType; +import java.util.List; import java.util.Optional; import org.mapstruct.Mapper; -@Mapper(componentModel = "spring") +@Mapper public interface KafkaSrMapper { default SchemaSubjectDTO toDto(SchemaRegistryService.SubjectWithCompatibilityLevel s) { @@ -24,9 +27,12 @@ default SchemaSubjectDTO toDto(SchemaRegistryService.SubjectWithCompatibilityLev .subject(s.getSubject()) .schema(s.getSchema()) .schemaType(SchemaTypeDTO.fromValue(Optional.ofNullable(s.getSchemaType()).orElse(SchemaType.AVRO).getValue())) + .references(toDto(s.getReferences())) .compatibilityLevel(s.getCompatibility().toString()); } + List toDto(List references); + CompatibilityCheckResponseDTO toDto(CompatibilityCheckResponse ccr); CompatibilityLevelDTO.CompatibilityEnum toDto(Compatibility compatibility); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java index c1fb203691c..4f6d4afe517 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java @@ -20,6 +20,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.json.JsonSchema; import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; @@ -217,7 +218,9 @@ private String convertSchema(SchemaMetadata schema, ParsedSchema parsedSchema) { case AVRO -> new AvroJsonSchemaConverter() .convert(basePath, ((AvroSchema) parsedSchema).rawSchema()) .toJson(); - case JSON -> schema.getSchema(); + case JSON -> + //need to use confluent JsonSchema since it includes resolved references + ((JsonSchema) parsedSchema).rawSchema().toString(); }; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java index f98bb5d376c..cae29ba93d6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java @@ -14,8 +14,7 @@ import com.provectus.kafka.ui.sr.model.NewSubject; import com.provectus.kafka.ui.sr.model.SchemaSubject; import com.provectus.kafka.ui.util.ReactiveFailover; -import com.provectus.kafka.ui.util.WebClientConfigurator; -import java.io.IOException; +import java.nio.charset.Charset; import java.util.List; import java.util.stream.Collectors; import lombok.AllArgsConstructor; @@ -92,7 +91,7 @@ public Mono getLatestSchemaVersionBySubject(Kafka private Mono getSchemaSubject(KafkaCluster cluster, String schemaName, String version) { return api(cluster) - .mono(c -> c.getSubjectVersion(schemaName, version)) + .mono(c -> c.getSubjectVersion(schemaName, version, false)) .zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName)) .map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2())) .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException())); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java new file mode 100644 index 00000000000..4ff1f8695b3 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java @@ -0,0 +1,55 @@ +package com.provectus.kafka.ui.service.integration.odd; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.provectus.kafka.ui.sr.api.KafkaSrClientApi; +import com.provectus.kafka.ui.sr.model.SchemaReference; +import java.util.List; +import java.util.Optional; +import javax.annotation.Nullable; +import reactor.core.publisher.Mono; + +// logic copied from AbstractSchemaProvider:resolveReferences +// https://github.com/confluentinc/schema-registry/blob/fd59613e2c5adf62e36705307f420712e4c8c1ea/client/src/main/java/io/confluent/kafka/schemaregistry/AbstractSchemaProvider.java#L54 +class SchemaReferencesResolver { + + private final KafkaSrClientApi client; + + SchemaReferencesResolver(KafkaSrClientApi client) { + this.client = client; + } + + Mono> resolve(List refs) { + return resolveReferences(refs, new Resolving(ImmutableMap.of(), ImmutableSet.of())) + .map(Resolving::resolved); + } + + private record Resolving(ImmutableMap resolved, ImmutableSet visited) { + + Resolving visit(String name) { + return new Resolving(resolved, ImmutableSet.builder().addAll(visited).add(name).build()); + } + + Resolving resolve(String ref, String schema) { + return new Resolving(ImmutableMap.builder().putAll(resolved).put(ref, schema).build(), visited); + } + } + + private Mono resolveReferences(@Nullable List refs, Resolving initState) { + Mono result = Mono.just(initState); + for (SchemaReference reference : Optional.ofNullable(refs).orElse(List.of())) { + result = result.flatMap(state -> { + if (state.visited().contains(reference.getName())) { + return Mono.just(state); + } else { + final var newState = state.visit(reference.getName()); + return client.getSubjectVersion(reference.getSubject(), String.valueOf(reference.getVersion()), true) + .flatMap(subj -> + resolveReferences(subj.getReferences(), newState) + .map(withNewRefs -> withNewRefs.resolve(reference.getName(), subj.getSchema()))); + } + }); + } + return result; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java index 2426c538086..8f4ef2781be 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java @@ -5,6 +5,7 @@ import com.provectus.kafka.ui.model.Statistics; import com.provectus.kafka.ui.service.StatisticsCache; import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors; +import com.provectus.kafka.ui.sr.model.SchemaSubject; import java.net.URI; import java.util.List; import java.util.Map; @@ -24,6 +25,8 @@ import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; @Slf4j @RequiredArgsConstructor @@ -100,12 +103,20 @@ private Mono> getTopicSchema(KafkaCluster cluster, return Mono.just(List.of()); } String subject = topic + (isKey ? "-key" : "-value"); - return cluster.getSchemaRegistryClient() - .mono(client -> client.getSubjectVersion(subject, "latest")) - .map(subj -> DataSetFieldsExtractors.extract(subj, topicOddrn, isKey)) + return getSubjWithResolvedRefs(cluster, subject) + .map(t -> DataSetFieldsExtractors.extract(t.getT1(), t.getT2(), topicOddrn, isKey)) .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of())) .onErrorMap(WebClientResponseException.class, err -> new IllegalStateException("Error retrieving subject %s".formatted(subject), err)); } + private Mono>> getSubjWithResolvedRefs(KafkaCluster cluster, + String subjectName) { + return cluster.getSchemaRegistryClient() + .mono(client -> + client.getSubjectVersion(subjectName, "latest", false) + .flatMap(subj -> new SchemaReferencesResolver(client).resolve(subj.getReferences()) + .map(resolvedRefs -> Tuples.of(subj, resolvedRefs)))); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java index cc799a9e109..f9423962933 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java @@ -1,7 +1,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema; import com.google.common.collect.ImmutableSet; -import com.provectus.kafka.ui.sr.model.SchemaSubject; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import java.util.ArrayList; import java.util.List; import org.apache.avro.Schema; @@ -14,8 +14,8 @@ final class AvroExtractor { private AvroExtractor() { } - static List extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) { - var schema = new Schema.Parser().parse(subject.getSchema()); + static List extract(AvroSchema avroSchema, KafkaPath topicOddrn, boolean isKey) { + var schema = avroSchema.rawSchema(); List result = new ArrayList<>(); result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey)); extract( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java index e357db30793..b9093262bd6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java @@ -2,7 +2,11 @@ import com.provectus.kafka.ui.sr.model.SchemaSubject; import com.provectus.kafka.ui.sr.model.SchemaType; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import java.util.List; +import java.util.Map; import java.util.Optional; import org.opendatadiscovery.client.model.DataSetField; import org.opendatadiscovery.client.model.DataSetFieldType; @@ -10,12 +14,18 @@ public final class DataSetFieldsExtractors { - public static List extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) { + public static List extract(SchemaSubject subject, + Map resolvedRefs, + KafkaPath topicOddrn, + boolean isKey) { SchemaType schemaType = Optional.ofNullable(subject.getSchemaType()).orElse(SchemaType.AVRO); return switch (schemaType) { - case AVRO -> AvroExtractor.extract(subject, topicOddrn, isKey); - case JSON -> JsonSchemaExtractor.extract(subject, topicOddrn, isKey); - case PROTOBUF -> ProtoExtractor.extract(subject, topicOddrn, isKey); + case AVRO -> AvroExtractor.extract( + new AvroSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey); + case JSON -> JsonSchemaExtractor.extract( + new JsonSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey); + case PROTOBUF -> ProtoExtractor.extract( + new ProtobufSchema(subject.getSchema(), List.of(), resolvedRefs, null, null), topicOddrn, isKey); }; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java index 06201b1ce7f..93adbdbe0cc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java @@ -30,8 +30,8 @@ final class JsonSchemaExtractor { private JsonSchemaExtractor() { } - static List extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) { - Schema schema = new JsonSchema(subject.getSchema()).rawSchema(); + static List extract(JsonSchema jsonSchema, KafkaPath topicOddrn, boolean isKey) { + Schema schema = jsonSchema.rawSchema(); List result = new ArrayList<>(); result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey)); extract( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java index c1316172f30..01b25ff48db 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java @@ -15,7 +15,6 @@ import com.google.protobuf.UInt32Value; import com.google.protobuf.UInt64Value; import com.google.protobuf.Value; -import com.provectus.kafka.ui.sr.model.SchemaSubject; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import java.util.ArrayList; import java.util.List; @@ -42,8 +41,8 @@ final class ProtoExtractor { private ProtoExtractor() { } - static List extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) { - Descriptor schema = new ProtobufSchema(subject.getSchema()).toDescriptor(); + static List extract(ProtobufSchema protobufSchema, KafkaPath topicOddrn, boolean isKey) { + Descriptor schema = protobufSchema.toDescriptor(); List result = new ArrayList<>(); result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey)); var rootOddrn = topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value"); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java index 60959be0492..5fa9aee7667 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java @@ -2,6 +2,7 @@ import com.provectus.kafka.ui.model.CompatibilityLevelDTO; import com.provectus.kafka.ui.model.NewSchemaSubjectDTO; +import com.provectus.kafka.ui.model.SchemaReferenceDTO; import com.provectus.kafka.ui.model.SchemaSubjectDTO; import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO; import com.provectus.kafka.ui.model.SchemaTypeDTO; @@ -190,6 +191,58 @@ void shouldCreateNewProtobufSchema() { Assertions.assertEquals(schema, actual.getSchema()); } + + @Test + void shouldCreateNewProtobufSchemaWithRefs() { + NewSchemaSubjectDTO requestBody = new NewSchemaSubjectDTO() + .schemaType(SchemaTypeDTO.PROTOBUF) + .subject(subject + "-ref") + .schema(""" + syntax = "proto3"; + message MyRecord { + int32 id = 1; + string name = 2; + } + """); + + webTestClient + .post() + .uri("/api/clusters/{clusterName}/schemas", LOCAL) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class)) + .exchange() + .expectStatus() + .isOk(); + + requestBody = new NewSchemaSubjectDTO() + .schemaType(SchemaTypeDTO.PROTOBUF) + .subject(subject) + .schema(""" + syntax = "proto3"; + import "MyRecord.proto"; + message MyRecordWithRef { + int32 id = 1; + MyRecord my_ref = 2; + } + """) + .references(List.of(new SchemaReferenceDTO().name("MyRecord.proto").subject(subject + "-ref").version(1))); + + SchemaSubjectDTO actual = webTestClient + .post() + .uri("/api/clusters/{clusterName}/schemas", LOCAL) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class)) + .exchange() + .expectStatus() + .isOk() + .expectBody(SchemaSubjectDTO.class) + .returnResult() + .getResponseBody(); + + Assertions.assertNotNull(actual); + Assertions.assertEquals(requestBody.getReferences(), actual.getReferences()); + } + @Test public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() { webTestClient @@ -278,7 +331,7 @@ public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() { void shouldCreateNewSchemaWhenSubjectIncludesNonAsciiCharacters() { String schema = "{\"subject\":\"test/test\",\"schemaType\":\"JSON\",\"schema\":" - + "\"{\\\"type\\\": \\\"string\\\"}\"}"; + + "\"{\\\"type\\\": \\\"string\\\"}\"}"; webTestClient .post() diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java new file mode 100644 index 00000000000..d24524473a7 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java @@ -0,0 +1,86 @@ +package com.provectus.kafka.ui.service.integration.odd; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import com.provectus.kafka.ui.sr.api.KafkaSrClientApi; +import com.provectus.kafka.ui.sr.model.SchemaReference; +import com.provectus.kafka.ui.sr.model.SchemaSubject; +import java.util.List; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +class SchemaReferencesResolverTest { + + private final KafkaSrClientApi srClientMock = mock(KafkaSrClientApi.class); + + private final SchemaReferencesResolver schemaReferencesResolver = new SchemaReferencesResolver(srClientMock); + + @Test + void resolvesRefsUsingSrClient() { + mockSrCall("sub1", 1, + new SchemaSubject() + .schema("schema1")); + + mockSrCall("sub2", 1, + new SchemaSubject() + .schema("schema2") + .references( + List.of( + new SchemaReference().name("ref2_1").subject("sub2_1").version(2), + new SchemaReference().name("ref2_2").subject("sub1").version(1)))); + + mockSrCall("sub2_1", 2, + new SchemaSubject() + .schema("schema2_1") + .references( + List.of( + new SchemaReference().name("ref2_1_1").subject("sub2_1_1").version(3), + new SchemaReference().name("ref1").subject("should_not_be_called").version(1) + )) + ); + + mockSrCall("sub2_1_1", 3, + new SchemaSubject() + .schema("schema2_1_1")); + + var resolvedRefsMono = schemaReferencesResolver.resolve( + List.of( + new SchemaReference().name("ref1").subject("sub1").version(1), + new SchemaReference().name("ref2").subject("sub2").version(1))); + + StepVerifier.create(resolvedRefsMono) + .assertNext(refs -> + assertThat(refs) + .containsExactlyEntriesOf( + // checking map should be ordered + ImmutableMap.builder() + .put("ref1", "schema1") + .put("ref2_1_1", "schema2_1_1") + .put("ref2_1", "schema2_1") + .put("ref2_2", "schema1") + .put("ref2", "schema2") + .build())) + .verifyComplete(); + } + + @Test + void returnsEmptyMapOnEmptyInputs() { + StepVerifier.create(schemaReferencesResolver.resolve(null)) + .assertNext(map -> assertThat(map).isEmpty()) + .verifyComplete(); + + StepVerifier.create(schemaReferencesResolver.resolve(List.of())) + .assertNext(map -> assertThat(map).isEmpty()) + .verifyComplete(); + } + + private void mockSrCall(String subject, int version, SchemaSubject subjectToReturn) { + when(srClientMock.getSubjectVersion(subject, version + "", true)) + .thenReturn(Mono.just(subjectToReturn)); + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java index 8673b469d6e..cb4103467be 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.service.integration.odd; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -54,9 +55,8 @@ void init() { @Test void doesNotExportTopicsWhichDontFitFiltrationRule() { - when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString())) + when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString(), anyBoolean())) .thenReturn(Mono.error(WebClientResponseException.create(404, "NF", new HttpHeaders(), null, null, null))); - stats = Statistics.empty() .toBuilder() .topicDescriptions( @@ -85,14 +85,14 @@ void doesNotExportTopicsWhichDontFitFiltrationRule() { @Test void doesExportTopicData() { - when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest")) + when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest", false)) .thenReturn(Mono.just( new SchemaSubject() .schema("\"string\"") .schemaType(SchemaType.AVRO) )); - when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest")) + when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest", false)) .thenReturn(Mono.just( new SchemaSubject() .schema("\"int\"") diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java index d523d7cd417..cd1baf77987 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java @@ -2,7 +2,7 @@ import static org.assertj.core.api.Assertions.assertThat; -import com.provectus.kafka.ui.sr.model.SchemaSubject; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.opendatadiscovery.client.model.DataSetField; @@ -15,8 +15,7 @@ class AvroExtractorTest { @ValueSource(booleans = {true, false}) void test(boolean isKey) { var list = AvroExtractor.extract( - new SchemaSubject() - .schema(""" + new AvroSchema(""" { "type": "record", "name": "Message", diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java index 7968e52e6dd..30a1e6229cc 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java @@ -2,7 +2,7 @@ import static org.assertj.core.api.Assertions.assertThat; -import com.provectus.kafka.ui.sr.model.SchemaSubject; +import io.confluent.kafka.schemaregistry.json.JsonSchema; import java.net.URI; import java.util.List; import java.util.Map; @@ -40,7 +40,7 @@ void test(boolean isKey) { } """; var fields = JsonSchemaExtractor.extract( - new SchemaSubject().schema(jsonSchema), + new JsonSchema(jsonSchema), KafkaPath.builder() .cluster("localhost:9092") .topic("someTopic") diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java index cbb97a859c2..8d6344d7cca 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java @@ -2,7 +2,7 @@ import static org.assertj.core.api.Assertions.assertThat; -import com.provectus.kafka.ui.sr.model.SchemaSubject; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.opendatadiscovery.client.model.DataSetField; @@ -54,8 +54,7 @@ enum SampleEnum { }"""; var list = ProtoExtractor.extract( - new SchemaSubject() - .schema(protoSchema), + new ProtobufSchema(protoSchema), KafkaPath.builder() .cluster("localhost:9092") .topic("someTopic") diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml index 84ee36b48de..0320e891ecd 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml @@ -77,6 +77,10 @@ paths: required: true schema: type: string + - name: deleted + in: query + schema: + type: boolean responses: 200: description: OK @@ -317,6 +321,10 @@ components: type: string schemaType: $ref: '#/components/schemas/SchemaType' + references: + type: array + items: + $ref: '#/components/schemas/SchemaReference' required: - id - subject diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index e15425a45ec..f9ed233bc1d 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -2916,6 +2916,10 @@ components: type: string schemaType: $ref: '#/components/schemas/SchemaType' + references: + type: array + items: + $ref: '#/components/schemas/SchemaReference' required: - id - subject @@ -2933,13 +2937,30 @@ components: schema: type: string schemaType: - $ref: '#/components/schemas/SchemaType' - # upon updating a schema, the type of existing schema can't be changed + $ref: '#/components/schemas/SchemaType' # upon updating a schema, the type of existing schema can't be changed + references: + type: array + items: + $ref: '#/components/schemas/SchemaReference' required: - subject - schema - schemaType + SchemaReference: + type: object + properties: + name: + type: string + subject: + type: string + version: + type: integer + required: + - name + - subject + - version + CompatibilityLevel: type: object properties: