diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 59c08a990..963da5f47 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -36,6 +36,7 @@ import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.clients.ResponsiveKafkaClientSupplier; import dev.responsive.kafka.internal.config.ConfigUtils; +import dev.responsive.kafka.internal.config.ControllerSignals; import dev.responsive.kafka.internal.config.InternalSessionConfigs; import dev.responsive.kafka.internal.config.ResponsiveStreamsConfig; import dev.responsive.kafka.internal.db.CassandraClientFactory; @@ -69,6 +70,7 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; @@ -85,11 +87,12 @@ public class ResponsiveKafkaStreams extends KafkaStreams { - private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaStreams.class); + private final Logger log; private final ResponsiveMetrics responsiveMetrics; private final ResponsiveStateListener responsiveStateListener; private final ResponsiveRestoreListener responsiveRestoreListener; + private final ControllerSignals controllerSignals; private final SessionClients sessionClients; /** @@ -189,6 +192,7 @@ protected ResponsiveKafkaStreams(final Params params) { propsWithOverrides( params.streamsConfig.getInt(NUM_STREAM_THREADS_CONFIG), params.responsiveConfig, + params.controllerSignals, params.sessionClients, params.storeRegistry, params.topology.describe(), @@ -197,6 +201,9 @@ protected ResponsiveKafkaStreams(final Params params) { params.time ); + this.log = new LogContext(String.format("stream-client [%s] ", clientId)) + .logger(ResponsiveKafkaStreams.class); + if (params.compatibilityMode == CompatibilityMode.FULL) { try { ResponsiveStreamsConfig.validateStreamsConfig(applicationConfigs); @@ -205,13 +212,14 @@ protected ResponsiveKafkaStreams(final Params params) { } } + this.controllerSignals = params.controllerSignals;; this.responsiveMetrics = params.metrics; this.sessionClients = params.sessionClients; final ClientVersionMetadata versionMetadata = ClientVersionMetadata.loadVersionMetadata(); // Only log the version metadata for Responsive since Kafka Streams will log its own - LOG.info("Responsive Client version: {}", versionMetadata.responsiveClientVersion); - LOG.info("Responsive Client commit ID: {}", versionMetadata.responsiveClientCommitId); + log.info("Responsive Client version: {}", versionMetadata.responsiveClientVersion); + log.info("Responsive Client commit ID: {}", versionMetadata.responsiveClientCommitId); responsiveMetrics.initializeTags( applicationConfigs.getString(APPLICATION_ID_CONFIG), @@ -264,6 +272,7 @@ private static ResponsiveMetrics createMetrics( private static Properties propsWithOverrides( final int numStreamThreads, final ResponsiveConfig configs, + final ControllerSignals controllerSignals, final SessionClients sessionClients, final ResponsiveStoreRegistry storeRegistry, final TopologyDescription topologyDescription, @@ -271,8 +280,8 @@ private static Properties propsWithOverrides( ) { final Properties propsWithOverrides = new Properties(); - final InternalSessionConfigs.Builder internalConfBuilder = new InternalSessionConfigs.Builder() + .withControllerSignals(controllerSignals) .withSessionClients(sessionClients) .withStoreRegistry(storeRegistry) .withTopologyDescription(topologyDescription) @@ -309,13 +318,25 @@ private static Properties propsWithOverrides( o, TASK_ASSIGNOR_CLASS_OVERRIDE ); - LOG.error(errorMsg); + + // create a temporary logger for use in static methods since the constructor hasn't even + // been called yet -- it's ok since we just shut down right after this anyways + LoggerFactory.getLogger(ResponsiveKafkaStreams.class).error(errorMsg); throw new ConfigException(errorMsg); } return propsWithOverrides; } + /** + * Request Kafka Streams to rebalance the consumer group. The rebalance will be triggered + * immediately on the next invocation of #poll by one of the StreamThreads. + */ + public void requestRebalance() { + log.info("Rebalance requested"); + controllerSignals.rebalanceRequested().set(true); + } + @Override public void setStateListener(final StateListener stateListener) { responsiveStateListener.registerUserStateListener(stateListener); @@ -401,6 +422,8 @@ protected static class Params { final ResponsiveStoreRegistry storeRegistry; final CompatibilityMode compatibilityMode; + final ControllerSignals controllerSignals = new ControllerSignals(); + // can be set during construction private Time time = Time.SYSTEM; private KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier(); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java index 58841e6ad..119fcf04a 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java @@ -22,9 +22,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; @@ -34,6 +36,8 @@ public class ResponsiveConsumer extends DelegatingConsumer { private final Logger log; + private final AtomicBoolean rebalanceRequested; + private final List listeners; private final Runnable shutdownAsyncThreadPool; @@ -80,6 +84,7 @@ public void onPartitionsAssigned(final Collection partitions) { public ResponsiveConsumer( final String clientId, final Consumer delegate, + final AtomicBoolean rebalanceRequested, final List listeners, final Runnable shutdownAsyncThreadPool ) { @@ -87,10 +92,20 @@ public ResponsiveConsumer( this.log = new LogContext( String.format("responsive-consumer [%s]", Objects.requireNonNull(clientId)) ).logger(ResponsiveConsumer.class); + this.rebalanceRequested = rebalanceRequested; this.listeners = Objects.requireNonNull(listeners); this.shutdownAsyncThreadPool = shutdownAsyncThreadPool; } + @Override + public ConsumerRecords poll(final Duration timeout) { + if (rebalanceRequested.getAndSet(false)) { + enforceRebalance("Triggering manual rebalance to reassign tasks"); + } + + return super.poll(timeout); + } + @Override public void subscribe(final Collection topics) { throw new IllegalStateException("Unexpected call to subscribe(Collection) on main consumer" diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java index 1a3ae7cc6..fdf07e413 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java @@ -18,6 +18,7 @@ import static dev.responsive.kafka.internal.config.InternalSessionConfigs.isAsyncThreadPoolRegistryEnabled; import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadAsyncThreadPoolRegistry; +import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadControllerSignals; import static dev.responsive.kafka.internal.utils.Utils.extractThreadId; import static dev.responsive.kafka.internal.utils.Utils.extractThreadNameFromConsumerClientId; import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; @@ -25,6 +26,7 @@ import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry; import dev.responsive.kafka.api.config.CompatibilityMode; import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.internal.config.ControllerSignals; import dev.responsive.kafka.internal.metrics.EndOffsetsPoller; import dev.responsive.kafka.internal.metrics.MetricPublishingCommitListener; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; @@ -37,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.OptionalLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.regex.Pattern; import org.apache.kafka.clients.CommonClientConfigs; @@ -169,6 +172,9 @@ public Consumer getConsumer(final Map config) { asyncThreadPoolRegistry = null; } + final ControllerSignals controllerSignals = loadControllerSignals(config); + final AtomicBoolean rebalanceRequested = controllerSignals.rebalanceRequested(); + final ListenersForThread tc = sharedListeners.getAndMaybeInitListenersForThread( eos, threadId, @@ -183,6 +189,7 @@ public Consumer getConsumer(final Map config) { return factories.createResponsiveConsumer( clientId, wrapped.getConsumer(config), + rebalanceRequested, List.of( tc.committedOffsetMetricListener, tc.offsetRecorder.getConsumerListener(), @@ -404,10 +411,13 @@ default ResponsiveProducer createResponsiveProducer( default ResponsiveConsumer createResponsiveConsumer( final String clientId, final Consumer wrapped, + final AtomicBoolean rebalanceRequested, final List listeners, final Runnable shutdownAsyncThreadPool ) { - return new ResponsiveConsumer<>(clientId, wrapped, listeners, shutdownAsyncThreadPool); + return new ResponsiveConsumer<>( + clientId, wrapped, rebalanceRequested, listeners, shutdownAsyncThreadPool + ); } default ResponsiveGlobalConsumer createGlobalConsumer( diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/config/ControllerSignals.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/config/ControllerSignals.java new file mode 100644 index 000000000..9c23298c3 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/config/ControllerSignals.java @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.internal.config; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Container class for signals and requests from the controller, such as manual rebalances + * + * TODO: connect to background thread that listens for requests and interfaces with controller + */ +public class ControllerSignals { + + private final AtomicBoolean rebalanceRequested = new AtomicBoolean(false); + + public AtomicBoolean rebalanceRequested() { + return rebalanceRequested; + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/config/InternalSessionConfigs.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/config/InternalSessionConfigs.java index d2cc75b1d..19a7a8b57 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/config/InternalSessionConfigs.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/config/InternalSessionConfigs.java @@ -8,6 +8,7 @@ import dev.responsive.kafka.internal.utils.SessionClients; import java.util.HashMap; import java.util.Map; +import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.slf4j.Logger; @@ -18,10 +19,11 @@ public final class InternalSessionConfigs { private static final Logger LOG = LoggerFactory.getLogger(InternalSessionConfigs.class); private static final String INTERNAL_ASYNC_THREAD_POOL_REGISTRY_CONFIG = "__internal.responsive.async.thread.pool.registry__"; + private static final String INTERNAL_CONTROLLER_SIGNALS_CONFIG = "__internal.responsive.controller.signals__"; + private static final String INTERNAL_METRICS_CONFIG = "__internal.responsive.metrics__"; private static final String INTERNAL_SESSION_CLIENTS_CONFIG = "__internal.responsive.cassandra.client__"; private static final String INTERNAL_STORE_REGISTRY_CONFIG = "__internal.responsive.store.registry__"; private static final String INTERNAL_TOPOLOGY_DESCRIPTION_CONFIG = "__internal.responsive.topology.description__"; - private static final String INTERNAL_METRICS_CONFIG = "__internal.responsive.metrics__"; private static T loadFromConfig( final Map configs, @@ -78,6 +80,18 @@ public static AsyncThreadPoolRegistry loadAsyncThreadPoolRegistry(final Map configs) { + return loadFromConfig( + configs, + InternalSessionConfigs.INTERNAL_CONTROLLER_SIGNALS_CONFIG, + ControllerSignals.class, + "Controller signals" + ); + } + public static ResponsiveMetrics loadMetrics(final Map configs) { return loadFromConfig( configs, @@ -108,26 +122,34 @@ public static ResponsiveStoreRegistry loadStoreRegistry(final Map configs = new HashMap<>(); + + // ----- Consumer client elements ----- // + /** * Note: we must add the main consumer prefix when first building the config - * map with this registry, as it is needed in the #getMainConsumer method of the - * KafkaClientSupplier, and only main-consumer configs are included in the copy - * of the config map it receives. - * Importantly, we should NOT include this prefix when attempting to retrieve this - * registry on the other end, unless you are extracting it from the original, app-wide - * config map. This prefix will be stripped away when the config is copied into a - * prefix-based submap, such as the one passed in to the KafkaClientSupplier for - * the main consumer or the one returned from the - * {@link ProcessorContext#appConfigsWithPrefix(String)} API. - * The {@link #loadAsyncThreadPoolRegistry(Map)} method assumes the prefix has been - * stripped and therefore only works on filtered submaps like in the two - * examples above + * map with any of the consumer client elements, as it is needed in the #getMainConsumer + * method of the KafkaClientSupplier, which receives only main-consumer configs in the + * passed-in config map. + * Importantly, we do NOT include this prefix when attempting to retrieve these + * elements on the other end via the #loadXYZ methods, since the consumer prefix + * is stripped away when the configs are copied into a prefix-scoped submap, such + * as the one passed into {@link KafkaClientSupplier#getConsumer(Map)} or the one + * returned by {@link ProcessorContext#appConfigsWithPrefix(String)}. + * The #loadXYZ method assumes the prefix has been stripped and therefore only works on + * filtered submaps like in the two examples above. */ public Builder withAsyncThreadPoolRegistry(final AsyncThreadPoolRegistry registry) { configs.put(mainConsumerPrefix(INTERNAL_ASYNC_THREAD_POOL_REGISTRY_CONFIG), registry); return this; } + public Builder withControllerSignals(final ControllerSignals controllerSignals) { + configs.put(mainConsumerPrefix(INTERNAL_CONTROLLER_SIGNALS_CONFIG), controllerSignals); + return this; + } + + // ----- Responsive elements ----- // + public Builder withMetrics(final ResponsiveMetrics metrics) { configs.put(INTERNAL_METRICS_CONFIG, metrics); return this; diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveConsumerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveConsumerTest.java index 25515502d..2159fb04f 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveConsumerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveConsumerTest.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -57,7 +58,7 @@ class ResponsiveConsumerTest { @BeforeEach public void setup() { consumer = new ResponsiveConsumer<>( - "clientid", wrapped, List.of(listener1, listener2), () -> {}); + "clientid", wrapped, new AtomicBoolean(), List.of(listener1, listener2), () -> {}); } @Test diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java index 7303ce42c..d4dec99f7 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java @@ -30,6 +30,7 @@ import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry; import dev.responsive.kafka.api.config.CompatibilityMode; import dev.responsive.kafka.internal.clients.ResponsiveKafkaClientSupplier.Factories; +import dev.responsive.kafka.internal.config.ControllerSignals; import dev.responsive.kafka.internal.metrics.EndOffsetsPoller; import dev.responsive.kafka.internal.metrics.MetricPublishingCommitListener; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; @@ -126,7 +127,7 @@ public void setup() { factories.createResponsiveProducer(any(), (ResponsiveProducer) any(), any()) ).thenReturn(responsiveProducer); lenient().when(factories.createResponsiveConsumer( - any(), (ResponsiveConsumer) any(), any(), any()) + any(), (ResponsiveConsumer) any(), any(), any(), any()) ).thenReturn(responsiveConsumer); lenient().when(factories.createMetricsPublishingCommitListener(any(), any(), any())) .thenReturn(commitMetricListener); @@ -196,7 +197,7 @@ public void shouldAddMetricPublishingCommitListenerToConsumer() { // then: verify(factories).createResponsiveConsumer( - any(), any(), consumerListenerCaptor.capture(), any()); + any(), any(), any(), consumerListenerCaptor.capture(), any()); assertThat(consumerListenerCaptor.getValue(), Matchers.hasItem(commitMetricListener)); verify(factories).createMetricsPublishingCommitListener( metrics, "StreamThread-0", offsetRecorder); @@ -229,7 +230,7 @@ public void shouldAddEndOffsetsPollerListeners() { // then: verify(factories).createResponsiveConsumer( - any(), any(), consumerListenerCaptor.capture(), any()); + any(), any(), any(), consumerListenerCaptor.capture(), any()); assertThat(consumerListenerCaptor.getValue(), Matchers.hasItem(consumerEndOffsetsPollListener)); } @@ -241,7 +242,7 @@ public void shouldCloseMetricPublishingCommitListenerWhenNoRefs() { // then: verify(factories).createResponsiveConsumer( - any(), any(), consumerListenerCaptor.capture(), any()); + any(), any(), any(), consumerListenerCaptor.capture(), any()); consumerListenerCaptor.getValue().forEach(ResponsiveConsumer.Listener::onClose); verify(commitMetricListener, times(0)).close(); verify(factories).createResponsiveProducer(any(), any(), producerListenerCaptor.capture()); @@ -309,6 +310,7 @@ private static Map configsWithOverrides( final var intermediate = new HashMap(); intermediate.putAll(configs); intermediate.putAll(overrides); + intermediate.put("__internal.responsive.controller.signals__", new ControllerSignals()); intermediate.put( "__internal.responsive.async.thread.pool.registry__", new AsyncThreadPoolRegistry( 2,