diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 8906e9e85..7377911fc 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -579,7 +579,7 @@ public Params build() { final var rs3Host = responsiveConfig.getString(RS3_HOSTNAME_CONFIG); final var rs3Port = responsiveConfig.getInt(RS3_PORT_CONFIG); final var rs3Connector = new GrpcRS3Client.Connector(time, rs3Host, rs3Port); - rs3Connector.retryTimeoutMs(responsiveConfig.getInt(RS3_RETRY_TIMEOUT_CONFIG)); + rs3Connector.retryTimeoutMs(responsiveConfig.getLong(RS3_RETRY_TIMEOUT_CONFIG)); rs3Connector.useTls(responsiveConfig.getBoolean(RS3_TLS_ENABLED_CONFIG)); sessionClients = new SessionClients( diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java index fd7875479..861a73412 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java @@ -137,7 +137,7 @@ public class ResponsiveConfig extends AbstractConfig { public static final String RS3_RETRY_TIMEOUT_CONFIG = "responsive.rs3.retry.timeout.ms"; private static final String RS3_RETRY_TIMEOUT_DOC = "Timeout in milliseconds for retries when RS3 endpoint is unavailable"; - public static final int RS3_RETRY_TIMEOUT_DEFAULT = 30000; + public static final long RS3_RETRY_TIMEOUT_DEFAULT = 30000; // ------------------ ScyllaDB specific configurations ---------------------- diff --git a/kafka-client/src/test/java/dev/responsive/kafka/api/config/ResponsiveConfigTest.java b/kafka-client/src/test/java/dev/responsive/kafka/api/config/ResponsiveConfigTest.java index 7fe9538fc..79eabb885 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/api/config/ResponsiveConfigTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/api/config/ResponsiveConfigTest.java @@ -25,4 +25,21 @@ public void testRs3TableMapping() { assertEquals(expectedMapping, config.getMap(ResponsiveConfig.RS3_LOGICAL_STORE_MAPPING_CONFIG)); } + @Test + public void testRs3RetryTimeoutConfig() { + var props = new Properties(); + var config = ResponsiveConfig.responsiveConfig(props); + assertEquals( + ResponsiveConfig.RS3_RETRY_TIMEOUT_DEFAULT, + config.getLong(ResponsiveConfig.RS3_RETRY_TIMEOUT_CONFIG) + ); + + props.setProperty(ResponsiveConfig.RS3_RETRY_TIMEOUT_CONFIG, "10"); + var reconfig = ResponsiveConfig.responsiveConfig(props); + assertEquals( + 10L, + reconfig.getLong(ResponsiveConfig.RS3_RETRY_TIMEOUT_CONFIG) + ); + } + } \ No newline at end of file