diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index 15436c1cd8b2..24b60b57116d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -27,6 +27,8 @@ public class ClustersProperties { String internalTopicPrefix; + Integer adminClientTimeout; + PollingProperties polling = new PollingProperties(); @Data diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java index 37495b50291b..2ad0538c0ec7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java @@ -5,7 +5,6 @@ import lombok.AllArgsConstructor; import org.openapitools.jackson.nullable.JsonNullableModule; import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.boot.autoconfigure.web.reactive.WebFluxProperties; import org.springframework.context.ApplicationContext; @@ -15,8 +14,6 @@ import org.springframework.http.server.reactive.HttpHandler; import org.springframework.jmx.export.MBeanExporter; import org.springframework.util.StringUtils; -import org.springframework.util.unit.DataSize; -import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; @Configuration @@ -52,14 +49,7 @@ public MBeanExporter exporter() { } @Bean - public WebClient webClient( - @Value("${webclient.max-in-memory-buffer-size:20MB}") DataSize maxBuffSize) { - return WebClient.builder() - .codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes())) - .build(); - } - - @Bean + // will be used by webflux json mapping public JsonNullableModule jsonNullableModule() { return new JsonNullableModule(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/WebclientProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/WebclientProperties.java new file mode 100644 index 000000000000..ad7732612d48 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/WebclientProperties.java @@ -0,0 +1,33 @@ +package com.provectus.kafka.ui.config; + +import com.provectus.kafka.ui.exception.ValidationException; +import java.beans.Transient; +import javax.annotation.PostConstruct; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.unit.DataSize; + +@Configuration +@ConfigurationProperties("webclient") +@Data +public class WebclientProperties { + + String maxInMemoryBufferSize; + + @PostConstruct + public void validate() { + validateAndSetDefaultBufferSize(); + } + + private void validateAndSetDefaultBufferSize() { + if (maxInMemoryBufferSize != null) { + try { + DataSize.parse(maxInMemoryBufferSize); + } catch (Exception e) { + throw new ValidationException("Invalid format for webclient.maxInMemoryBufferSize"); + } + } + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java index 886b67b9282b..1bd4d7e33e8d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java @@ -1,33 +1,36 @@ package com.provectus.kafka.ui.service; +import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.util.SslPropertiesUtil; import java.io.Closeable; import java.time.Instant; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import lombok.RequiredArgsConstructor; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; @Service -@RequiredArgsConstructor @Slf4j public class AdminClientServiceImpl implements AdminClientService, Closeable { + private static final int DEFAULT_CLIENT_TIMEOUT_MS = 30_000; + private static final AtomicLong CLIENT_ID_SEQ = new AtomicLong(); private final Map adminClientCache = new ConcurrentHashMap<>(); - @Setter // used in tests - @Value("${kafka.admin-client-timeout:30000}") - private int clientTimeout; + private final int clientTimeout; + + public AdminClientServiceImpl(ClustersProperties clustersProperties) { + this.clientTimeout = Optional.ofNullable(clustersProperties.getAdminClientTimeout()) + .orElse(DEFAULT_CLIENT_TIMEOUT_MS); + } @Override public Mono get(KafkaCluster cluster) { @@ -42,7 +45,7 @@ private Mono createAdminClient(KafkaCluster cluster) { SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); properties.putAll(cluster.getProperties()); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); - properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); + properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); properties.putIfAbsent( AdminClientConfig.CLIENT_ID_CONFIG, "kafka-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java index 357a548a6373..964b25473d32 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java @@ -2,6 +2,7 @@ import com.provectus.kafka.ui.client.RetryingKafkaConnectClient; import com.provectus.kafka.ui.config.ClustersProperties; +import com.provectus.kafka.ui.config.WebclientProperties; import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi; import com.provectus.kafka.ui.emitter.PollingSettings; import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO; @@ -22,9 +23,7 @@ import java.util.Properties; import java.util.stream.Stream; import javax.annotation.Nullable; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.unit.DataSize; import org.springframework.web.reactive.function.client.WebClient; @@ -34,12 +33,18 @@ import reactor.util.function.Tuples; @Service -@RequiredArgsConstructor @Slf4j public class KafkaClusterFactory { - @Value("${webclient.max-in-memory-buffer-size:20MB}") - private DataSize maxBuffSize; + private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB"); + + private final DataSize webClientMaxBuffSize; + + public KafkaClusterFactory(WebclientProperties webclientProperties) { + this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize()) + .map(DataSize::parse) + .orElse(DEFAULT_WEBCLIENT_BUFFER); + } public KafkaCluster create(ClustersProperties properties, ClustersProperties.Cluster clusterProperties) { @@ -140,7 +145,7 @@ private ReactiveFailover connectClient(ClustersProperties url -> new RetryingKafkaConnectClient( connectCluster.toBuilder().address(url).build(), cluster.getSsl(), - maxBuffSize + webClientMaxBuffSize ), ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER, "No alive connect instances available", @@ -158,7 +163,7 @@ private ReactiveFailover schemaRegistryClient(ClustersProperti WebClient webClient = new WebClientConfigurator() .configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl()) .configureBasicAuth(auth.getUsername(), auth.getPassword()) - .configureBufferSize(maxBuffSize) + .configureBufferSize(webClientMaxBuffSize) .build(); return ReactiveFailover.create( parseUrlList(clusterProperties.getSchemaRegistry()), @@ -181,7 +186,7 @@ private ReactiveFailover ksqlClient(ClustersProperties.Cluster cl clusterProperties.getKsqldbServerAuth(), clusterProperties.getSsl(), clusterProperties.getKsqldbServerSsl(), - maxBuffSize + webClientMaxBuffSize ), ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER, "No live ksqldb instances available", diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/DynamicConfigOperations.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/DynamicConfigOperations.java index 2e1b32d3f1b3..75c6d25f959e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/DynamicConfigOperations.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/DynamicConfigOperations.java @@ -2,6 +2,7 @@ import com.provectus.kafka.ui.config.ClustersProperties; +import com.provectus.kafka.ui.config.WebclientProperties; import com.provectus.kafka.ui.config.auth.OAuthProperties; import com.provectus.kafka.ui.config.auth.RoleBasedAccessControlProperties; import com.provectus.kafka.ui.exception.FileUploadException; @@ -97,6 +98,7 @@ public PropertiesStructure getCurrentProperties() { .type(ctx.getEnvironment().getProperty("auth.type")) .oauth2(getNullableBean(OAuthProperties.class)) .build()) + .webclient(getNullableBean(WebclientProperties.class)) .build(); } @@ -204,6 +206,7 @@ public static class PropertiesStructure { private ClustersProperties kafka; private RoleBasedAccessControlProperties rbac; private Auth auth; + private WebclientProperties webclient; @Data @Builder @@ -222,6 +225,9 @@ public void initAndValidate() { Optional.ofNullable(auth) .flatMap(a -> Optional.ofNullable(a.oauth2)) .ifPresent(OAuthProperties::validate); + + Optional.ofNullable(webclient) + .ifPresent(WebclientProperties::validate); } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 7b6fd3c1131b..aef724446625 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -3467,6 +3467,12 @@ components: type: array items: $ref: '#/components/schemas/Action' + webclient: + type: object + properties: + maxInMemoryBufferSize: + type: string + description: "examples: 20, 12KB, 5MB" kafka: type: object properties: @@ -3479,6 +3485,10 @@ components: type: integer noDataEmptyPolls: type: integer + adminClientTimeout: + type: integer + internalTopicPrefix: + type: string clusters: type: array items: