diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index 33763ed9b89..b3b3c857de4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -35,22 +35,30 @@ public class ClustersProperties { public static class Cluster { String name; String bootstrapServers; + + TruststoreConfig ssl; + String schemaRegistry; SchemaRegistryAuth schemaRegistryAuth; KeystoreConfig schemaRegistrySsl; + String ksqldbServer; KsqldbServerAuth ksqldbServerAuth; KeystoreConfig ksqldbServerSsl; + List kafkaConnect; - MetricsConfigData metrics; - Map properties; - boolean readOnly = false; + List serde; String defaultKeySerde; String defaultValueSerde; - List masking; + + MetricsConfigData metrics; + Map properties; + boolean readOnly = false; Long pollingThrottleRate; - TruststoreConfig ssl; + + List masking; + AuditProperties audit; } @@ -101,6 +109,16 @@ public static class SchemaRegistryAuth { public static class TruststoreConfig { String truststoreLocation; String truststorePassword; + boolean verifySsl = true; + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + @ToString(exclude = {"keystorePassword"}) + public static class KeystoreConfig { + String keystoreLocation; + String keystorePassword; } @Data @@ -120,15 +138,6 @@ public static class KsqldbServerAuth { String password; } - @Data - @NoArgsConstructor - @AllArgsConstructor - @ToString(exclude = {"keystorePassword"}) - public static class KeystoreConfig { - String keystoreLocation; - String keystorePassword; - } - @Data public static class Masking { Type type; @@ -178,6 +187,7 @@ private void flattenClusterProperties() { } } + @SuppressWarnings("unchecked") private Map flattenClusterProperties(@Nullable String prefix, @Nullable Map propertiesMap) { Map flattened = new HashMap<>(); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java index 1bd4d7e33e8..9695bf2dbd0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java @@ -2,7 +2,7 @@ import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.util.SslPropertiesUtil; +import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil; import java.io.Closeable; import java.time.Instant; import java.util.Map; @@ -42,7 +42,7 @@ public Mono get(KafkaCluster cluster) { private Mono createAdminClient(KafkaCluster cluster) { return Mono.fromSupplier(() -> { Properties properties = new Properties(); - SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); + KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); properties.putAll(cluster.getProperties()); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java index 9764664d6a2..ccb2a18691f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java @@ -10,7 +10,7 @@ import com.provectus.kafka.ui.model.SortOrderDTO; import com.provectus.kafka.ui.service.rbac.AccessControlService; import com.provectus.kafka.ui.util.ApplicationMetrics; -import com.provectus.kafka.ui.util.SslPropertiesUtil; +import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -254,7 +254,7 @@ public EnhancedConsumer createConsumer(KafkaCluster cluster) { public EnhancedConsumer createConsumer(KafkaCluster cluster, Map properties) { Properties props = new Properties(); - SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props); + KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props); props.putAll(cluster.getProperties()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index dcc122ba282..7ab4b60b5b4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -20,7 +20,7 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serde.api.Serde; import com.provectus.kafka.ui.serdes.ProducerRecordCreator; -import com.provectus.kafka.ui.util.SslPropertiesUtil; +import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; @@ -191,7 +191,7 @@ private Mono sendMessageImpl(KafkaCluster cluster, public static KafkaProducer createProducer(KafkaCluster cluster, Map additionalProps) { Properties properties = new Properties(); - SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); + KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); properties.putAll(cluster.getProperties()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java index e8f4954bf0a..4673ee1b037 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java @@ -130,8 +130,8 @@ private Flux executeSelect(String ksql, Map s * Some version of ksqldb (?..0.24) can cut off json streaming without respect proper array ending like

* [{"header":{"queryId":"...","schema":"..."}}, ] * which will cause json parsing error and will be propagated to UI. - * This is a know issue(https://github.com/confluentinc/ksql/issues/8746), but we don't know when it will be fixed. - * To workaround this we need to check DecodingException err msg. + * This is a know issue(...), but we don't know when it will be fixed. + * To work around this we need to check DecodingException err msg. */ private boolean isUnexpectedJsonArrayEndCharException(Throwable th) { return th instanceof DecodingException diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaClientSslPropertiesUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaClientSslPropertiesUtil.java new file mode 100644 index 00000000000..23d2e8340b5 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaClientSslPropertiesUtil.java @@ -0,0 +1,34 @@ +package com.provectus.kafka.ui.util; + +import com.provectus.kafka.ui.config.ClustersProperties; +import java.util.Properties; +import javax.annotation.Nullable; +import org.apache.kafka.common.config.SslConfigs; + +public final class KafkaClientSslPropertiesUtil { + + private KafkaClientSslPropertiesUtil() { + } + + public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig, + Properties sink) { + if (truststoreConfig == null) { + return; + } + + if (truststoreConfig.getTruststoreLocation() == null) { + return; + } + + sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation()); + + if (truststoreConfig.getTruststorePassword() != null) { + sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword()); + } + + if (!truststoreConfig.isVerifySsl()) { + sink.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + } + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java index 4b8af81f851..914a3ce92dd 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java @@ -65,7 +65,7 @@ public static Mono validateClusterConnection(S @Nullable TruststoreConfig ssl) { Properties properties = new Properties(); - SslPropertiesUtil.addKafkaSslProperties(ssl, properties); + KafkaClientSslPropertiesUtil.addKafkaSslProperties(ssl, properties); properties.putAll(clusterProps); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // editing properties to make validation faster diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/SslPropertiesUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/SslPropertiesUtil.java deleted file mode 100644 index 4d157fbcb5f..00000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/SslPropertiesUtil.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.provectus.kafka.ui.util; - -import com.provectus.kafka.ui.config.ClustersProperties; -import java.util.Properties; -import javax.annotation.Nullable; -import org.apache.kafka.common.config.SslConfigs; - -public final class SslPropertiesUtil { - - private SslPropertiesUtil() { - } - - public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig, - Properties sink) { - if (truststoreConfig != null && truststoreConfig.getTruststoreLocation() != null) { - sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation()); - if (truststoreConfig.getTruststorePassword() != null) { - sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword()); - } - } - } - -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java index c5aca5ad716..6bc0b004e39 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java @@ -7,6 +7,7 @@ import com.provectus.kafka.ui.exception.ValidationException; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import java.io.FileInputStream; import java.security.KeyStore; import java.util.function.Consumer; @@ -45,6 +46,10 @@ private static ObjectMapper defaultOM() { public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig, @Nullable ClustersProperties.KeystoreConfig keystoreConfig) { + if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) { + return configureNoSsl(); + } + return configureSsl( keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null, keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null, @@ -97,6 +102,17 @@ private WebClientConfigurator configureSsl( return this; } + @SneakyThrows + public WebClientConfigurator configureNoSsl() { + var contextBuilder = SslContextBuilder.forClient(); + contextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE); + + SslContext context = contextBuilder.build(); + + httpClient = httpClient.secure(t -> t.sslContext(context)); + return this; + } + public WebClientConfigurator configureBasicAuth(@Nullable String username, @Nullable String password) { if (username != null && password != null) { builder.defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth(username, password)); diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index 7848f1fdc49..149e414acf2 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -136,7 +136,7 @@ rbac: actions: all - resource: connect - value: "*" + value: ".*" actions: all - resource: ksql