Skip to content

Commit

Permalink
Config wizard BE: Add remaining cluster properties to wizard API (#3523)
Browse files Browse the repository at this point in the history
* Important @value annotated properties moved to typed classes
---------

Co-authored-by: iliax <[email protected]>
Co-authored-by: Roman Zabaluev <[email protected]>
Co-authored-by: VladSenyuta <[email protected]>
  • Loading branch information
4 people authored and Kamila Alekbaeva committed Apr 17, 2023
1 parent 19c0b29 commit 16305b2
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class ClustersProperties {

String internalTopicPrefix;

Integer adminClientTimeout;

PollingProperties polling = new PollingProperties();

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import lombok.AllArgsConstructor;
import org.openapitools.jackson.nullable.JsonNullableModule;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.boot.autoconfigure.web.reactive.WebFluxProperties;
import org.springframework.context.ApplicationContext;
Expand All @@ -15,8 +14,6 @@
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.jmx.export.MBeanExporter;
import org.springframework.util.StringUtils;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;

@Configuration
Expand Down Expand Up @@ -52,14 +49,7 @@ public MBeanExporter exporter() {
}

@Bean
public WebClient webClient(
@Value("${webclient.max-in-memory-buffer-size:20MB}") DataSize maxBuffSize) {
return WebClient.builder()
.codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()))
.build();
}

@Bean
// will be used by webflux json mapping
public JsonNullableModule jsonNullableModule() {
return new JsonNullableModule();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.provectus.kafka.ui.config;

import com.provectus.kafka.ui.exception.ValidationException;
import java.beans.Transient;
import javax.annotation.PostConstruct;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.unit.DataSize;

@Configuration
@ConfigurationProperties("webclient")
@Data
public class WebclientProperties {

String maxInMemoryBufferSize;

@PostConstruct
public void validate() {
validateAndSetDefaultBufferSize();
}

private void validateAndSetDefaultBufferSize() {
if (maxInMemoryBufferSize != null) {
try {
DataSize.parse(maxInMemoryBufferSize);
} catch (Exception e) {
throw new ValidationException("Invalid format for webclient.maxInMemoryBufferSize");
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,33 +1,36 @@
package com.provectus.kafka.ui.service;

import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.io.Closeable;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
@RequiredArgsConstructor
@Slf4j
public class AdminClientServiceImpl implements AdminClientService, Closeable {

private static final int DEFAULT_CLIENT_TIMEOUT_MS = 30_000;

private static final AtomicLong CLIENT_ID_SEQ = new AtomicLong();

private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap<>();
@Setter // used in tests
@Value("${kafka.admin-client-timeout:30000}")
private int clientTimeout;
private final int clientTimeout;

public AdminClientServiceImpl(ClustersProperties clustersProperties) {
this.clientTimeout = Optional.ofNullable(clustersProperties.getAdminClientTimeout())
.orElse(DEFAULT_CLIENT_TIMEOUT_MS);
}

@Override
public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
Expand All @@ -42,7 +45,7 @@ private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties());
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
properties.putIfAbsent(
AdminClientConfig.CLIENT_ID_CONFIG,
"kafka-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.config.WebclientProperties;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
Expand All @@ -22,9 +23,7 @@
import java.util.Properties;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.WebClient;
Expand All @@ -34,12 +33,18 @@
import reactor.util.function.Tuples;

@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaClusterFactory {

@Value("${webclient.max-in-memory-buffer-size:20MB}")
private DataSize maxBuffSize;
private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB");

private final DataSize webClientMaxBuffSize;

public KafkaClusterFactory(WebclientProperties webclientProperties) {
this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize())
.map(DataSize::parse)
.orElse(DEFAULT_WEBCLIENT_BUFFER);
}

public KafkaCluster create(ClustersProperties properties,
ClustersProperties.Cluster clusterProperties) {
Expand Down Expand Up @@ -140,7 +145,7 @@ private ReactiveFailover<KafkaConnectClientApi> connectClient(ClustersProperties
url -> new RetryingKafkaConnectClient(
connectCluster.toBuilder().address(url).build(),
cluster.getSsl(),
maxBuffSize
webClientMaxBuffSize
),
ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
"No alive connect instances available",
Expand All @@ -158,7 +163,7 @@ private ReactiveFailover<KafkaSrClientApi> schemaRegistryClient(ClustersProperti
WebClient webClient = new WebClientConfigurator()
.configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl())
.configureBasicAuth(auth.getUsername(), auth.getPassword())
.configureBufferSize(maxBuffSize)
.configureBufferSize(webClientMaxBuffSize)
.build();
return ReactiveFailover.create(
parseUrlList(clusterProperties.getSchemaRegistry()),
Expand All @@ -181,7 +186,7 @@ private ReactiveFailover<KsqlApiClient> ksqlClient(ClustersProperties.Cluster cl
clusterProperties.getKsqldbServerAuth(),
clusterProperties.getSsl(),
clusterProperties.getKsqldbServerSsl(),
maxBuffSize
webClientMaxBuffSize
),
ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
"No live ksqldb instances available",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.config.WebclientProperties;
import com.provectus.kafka.ui.config.auth.OAuthProperties;
import com.provectus.kafka.ui.config.auth.RoleBasedAccessControlProperties;
import com.provectus.kafka.ui.exception.FileUploadException;
Expand Down Expand Up @@ -97,6 +98,7 @@ public PropertiesStructure getCurrentProperties() {
.type(ctx.getEnvironment().getProperty("auth.type"))
.oauth2(getNullableBean(OAuthProperties.class))
.build())
.webclient(getNullableBean(WebclientProperties.class))
.build();
}

Expand Down Expand Up @@ -204,6 +206,7 @@ public static class PropertiesStructure {
private ClustersProperties kafka;
private RoleBasedAccessControlProperties rbac;
private Auth auth;
private WebclientProperties webclient;

@Data
@Builder
Expand All @@ -222,6 +225,9 @@ public void initAndValidate() {
Optional.ofNullable(auth)
.flatMap(a -> Optional.ofNullable(a.oauth2))
.ifPresent(OAuthProperties::validate);

Optional.ofNullable(webclient)
.ifPresent(WebclientProperties::validate);
}
}

Expand Down
10 changes: 10 additions & 0 deletions kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3467,6 +3467,12 @@ components:
type: array
items:
$ref: '#/components/schemas/Action'
webclient:
type: object
properties:
maxInMemoryBufferSize:
type: string
description: "examples: 20, 12KB, 5MB"
kafka:
type: object
properties:
Expand All @@ -3479,6 +3485,10 @@ components:
type: integer
noDataEmptyPolls:
type: integer
adminClientTimeout:
type: integer
internalTopicPrefix:
type: string
clusters:
type: array
items:
Expand Down

0 comments on commit 16305b2

Please sign in to comment.