Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.clients.ResponsiveKafkaClientSupplier;
import dev.responsive.kafka.internal.config.ConfigUtils;
import dev.responsive.kafka.internal.config.ControllerSignals;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.config.ResponsiveStreamsConfig;
import dev.responsive.kafka.internal.db.CassandraClientFactory;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
Expand All @@ -85,11 +87,12 @@

public class ResponsiveKafkaStreams extends KafkaStreams {

private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaStreams.class);
private final Logger log;

private final ResponsiveMetrics responsiveMetrics;
private final ResponsiveStateListener responsiveStateListener;
private final ResponsiveRestoreListener responsiveRestoreListener;
private final ControllerSignals controllerSignals;
private final SessionClients sessionClients;

/**
Expand Down Expand Up @@ -189,6 +192,7 @@ protected ResponsiveKafkaStreams(final Params params) {
propsWithOverrides(
params.streamsConfig.getInt(NUM_STREAM_THREADS_CONFIG),
params.responsiveConfig,
params.controllerSignals,
params.sessionClients,
params.storeRegistry,
params.topology.describe(),
Expand All @@ -197,6 +201,9 @@ protected ResponsiveKafkaStreams(final Params params) {
params.time
);

this.log = new LogContext(String.format("stream-client [%s] ", clientId))
.logger(ResponsiveKafkaStreams.class);

if (params.compatibilityMode == CompatibilityMode.FULL) {
try {
ResponsiveStreamsConfig.validateStreamsConfig(applicationConfigs);
Expand All @@ -205,13 +212,14 @@ protected ResponsiveKafkaStreams(final Params params) {
}
}

this.controllerSignals = params.controllerSignals;;
this.responsiveMetrics = params.metrics;
this.sessionClients = params.sessionClients;

final ClientVersionMetadata versionMetadata = ClientVersionMetadata.loadVersionMetadata();
// Only log the version metadata for Responsive since Kafka Streams will log its own
LOG.info("Responsive Client version: {}", versionMetadata.responsiveClientVersion);
LOG.info("Responsive Client commit ID: {}", versionMetadata.responsiveClientCommitId);
log.info("Responsive Client version: {}", versionMetadata.responsiveClientVersion);
log.info("Responsive Client commit ID: {}", versionMetadata.responsiveClientCommitId);

responsiveMetrics.initializeTags(
applicationConfigs.getString(APPLICATION_ID_CONFIG),
Expand Down Expand Up @@ -264,15 +272,16 @@ private static ResponsiveMetrics createMetrics(
private static Properties propsWithOverrides(
final int numStreamThreads,
final ResponsiveConfig configs,
final ControllerSignals controllerSignals,
final SessionClients sessionClients,
final ResponsiveStoreRegistry storeRegistry,
final TopologyDescription topologyDescription,
final ResponsiveMetrics responsiveMetrics
) {
final Properties propsWithOverrides = new Properties();


final InternalSessionConfigs.Builder internalConfBuilder = new InternalSessionConfigs.Builder()
.withControllerSignals(controllerSignals)
.withSessionClients(sessionClients)
.withStoreRegistry(storeRegistry)
.withTopologyDescription(topologyDescription)
Expand Down Expand Up @@ -309,13 +318,25 @@ private static Properties propsWithOverrides(
o,
TASK_ASSIGNOR_CLASS_OVERRIDE
);
LOG.error(errorMsg);

// create a temporary logger for use in static methods since the constructor hasn't even
// been called yet -- it's ok since we just shut down right after this anyways
LoggerFactory.getLogger(ResponsiveKafkaStreams.class).error(errorMsg);
throw new ConfigException(errorMsg);
}

return propsWithOverrides;
}

/**
* Request Kafka Streams to rebalance the consumer group. The rebalance will be triggered
* immediately on the next invocation of #poll by one of the StreamThreads.
*/
public void requestRebalance() {
log.info("Rebalance requested");
controllerSignals.rebalanceRequested().set(true);
}

@Override
public void setStateListener(final StateListener stateListener) {
responsiveStateListener.registerUserStateListener(stateListener);
Expand Down Expand Up @@ -401,6 +422,8 @@ protected static class Params {
final ResponsiveStoreRegistry storeRegistry;
final CompatibilityMode compatibilityMode;

final ControllerSignals controllerSignals = new ControllerSignals();

// can be set during construction
private Time time = Time.SYSTEM;
private KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -34,6 +36,8 @@
public class ResponsiveConsumer<K, V> extends DelegatingConsumer<K, V> {
private final Logger log;

private final AtomicBoolean rebalanceRequested;

private final List<Listener> listeners;
private final Runnable shutdownAsyncThreadPool;

Expand Down Expand Up @@ -80,17 +84,28 @@ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
public ResponsiveConsumer(
final String clientId,
final Consumer<K, V> delegate,
final AtomicBoolean rebalanceRequested,
final List<Listener> listeners,
final Runnable shutdownAsyncThreadPool
) {
super(delegate);
this.log = new LogContext(
String.format("responsive-consumer [%s]", Objects.requireNonNull(clientId))
).logger(ResponsiveConsumer.class);
this.rebalanceRequested = rebalanceRequested;
this.listeners = Objects.requireNonNull(listeners);
this.shutdownAsyncThreadPool = shutdownAsyncThreadPool;
}

@Override
public ConsumerRecords<K, V> poll(final Duration timeout) {
if (rebalanceRequested.getAndSet(false)) {
enforceRebalance("Triggering manual rebalance to reassign tasks");
}

return super.poll(timeout);
}

@Override
public void subscribe(final Collection<String> topics) {
throw new IllegalStateException("Unexpected call to subscribe(Collection) on main consumer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import static dev.responsive.kafka.internal.config.InternalSessionConfigs.isAsyncThreadPoolRegistryEnabled;
import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadAsyncThreadPoolRegistry;
import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadControllerSignals;
import static dev.responsive.kafka.internal.utils.Utils.extractThreadId;
import static dev.responsive.kafka.internal.utils.Utils.extractThreadNameFromConsumerClientId;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;

import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry;
import dev.responsive.kafka.api.config.CompatibilityMode;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.config.ControllerSignals;
import dev.responsive.kafka.internal.metrics.EndOffsetsPoller;
import dev.responsive.kafka.internal.metrics.MetricPublishingCommitListener;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
Expand All @@ -37,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.kafka.clients.CommonClientConfigs;
Expand Down Expand Up @@ -169,6 +172,9 @@ public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
asyncThreadPoolRegistry = null;
}

final ControllerSignals controllerSignals = loadControllerSignals(config);
final AtomicBoolean rebalanceRequested = controllerSignals.rebalanceRequested();

final ListenersForThread tc = sharedListeners.getAndMaybeInitListenersForThread(
eos,
threadId,
Expand All @@ -183,6 +189,7 @@ public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
return factories.createResponsiveConsumer(
clientId,
wrapped.getConsumer(config),
rebalanceRequested,
List.of(
tc.committedOffsetMetricListener,
tc.offsetRecorder.getConsumerListener(),
Expand Down Expand Up @@ -404,10 +411,13 @@ default <K, V> ResponsiveProducer<K, V> createResponsiveProducer(
default <K, V> ResponsiveConsumer<K, V> createResponsiveConsumer(
final String clientId,
final Consumer<K, V> wrapped,
final AtomicBoolean rebalanceRequested,
final List<ResponsiveConsumer.Listener> listeners,
final Runnable shutdownAsyncThreadPool
) {
return new ResponsiveConsumer<>(clientId, wrapped, listeners, shutdownAsyncThreadPool);
return new ResponsiveConsumer<>(
clientId, wrapped, rebalanceRequested, listeners, shutdownAsyncThreadPool
);
}

default <K, V> ResponsiveGlobalConsumer createGlobalConsumer(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.responsive.kafka.internal.config;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* Container class for signals and requests from the controller, such as manual rebalances
*
* TODO: connect to background thread that listens for requests and interfaces with controller
*/
public class ControllerSignals {

private final AtomicBoolean rebalanceRequested = new AtomicBoolean(false);

public AtomicBoolean rebalanceRequested() {
return rebalanceRequested;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import dev.responsive.kafka.internal.utils.SessionClients;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.slf4j.Logger;
Expand All @@ -18,10 +19,11 @@ public final class InternalSessionConfigs {
private static final Logger LOG = LoggerFactory.getLogger(InternalSessionConfigs.class);

private static final String INTERNAL_ASYNC_THREAD_POOL_REGISTRY_CONFIG = "__internal.responsive.async.thread.pool.registry__";
private static final String INTERNAL_CONTROLLER_SIGNALS_CONFIG = "__internal.responsive.controller.signals__";
private static final String INTERNAL_METRICS_CONFIG = "__internal.responsive.metrics__";
private static final String INTERNAL_SESSION_CLIENTS_CONFIG = "__internal.responsive.cassandra.client__";
private static final String INTERNAL_STORE_REGISTRY_CONFIG = "__internal.responsive.store.registry__";
private static final String INTERNAL_TOPOLOGY_DESCRIPTION_CONFIG = "__internal.responsive.topology.description__";
private static final String INTERNAL_METRICS_CONFIG = "__internal.responsive.metrics__";

private static <T> T loadFromConfig(
final Map<String, Object> configs,
Expand Down Expand Up @@ -78,6 +80,18 @@ public static AsyncThreadPoolRegistry loadAsyncThreadPoolRegistry(final Map<Stri
);
}

// CAUTION: this method assumes the provided config map has stripped away the
// main.consumer prefix that was added to this config in the original Streams
// properties.
public static ControllerSignals loadControllerSignals(final Map<String, Object> configs) {
return loadFromConfig(
configs,
InternalSessionConfigs.INTERNAL_CONTROLLER_SIGNALS_CONFIG,
ControllerSignals.class,
"Controller signals"
);
}

public static ResponsiveMetrics loadMetrics(final Map<String, Object> configs) {
return loadFromConfig(
configs,
Expand Down Expand Up @@ -108,26 +122,34 @@ public static ResponsiveStoreRegistry loadStoreRegistry(final Map<String, Object
public static class Builder {
private final Map<String, Object> configs = new HashMap<>();


// ----- Consumer client elements ----- //

/**
* Note: we must add the main consumer prefix when first building the config
* map with this registry, as it is needed in the #getMainConsumer method of the
* KafkaClientSupplier, and only main-consumer configs are included in the copy
* of the config map it receives.
* Importantly, we should NOT include this prefix when attempting to retrieve this
* registry on the other end, unless you are extracting it from the original, app-wide
* config map. This prefix will be stripped away when the config is copied into a
* prefix-based submap, such as the one passed in to the KafkaClientSupplier for
* the main consumer or the one returned from the
* {@link ProcessorContext#appConfigsWithPrefix(String)} API.
* The {@link #loadAsyncThreadPoolRegistry(Map)} method assumes the prefix has been
* stripped and therefore only works on filtered submaps like in the two
* examples above
* map with any of the consumer client elements, as it is needed in the #getMainConsumer
* method of the KafkaClientSupplier, which receives only main-consumer configs in the
* passed-in config map.
* Importantly, we do NOT include this prefix when attempting to retrieve these
* elements on the other end via the #loadXYZ methods, since the consumer prefix
* is stripped away when the configs are copied into a prefix-scoped submap, such
* as the one passed into {@link KafkaClientSupplier#getConsumer(Map)} or the one
* returned by {@link ProcessorContext#appConfigsWithPrefix(String)}.
* The #loadXYZ method assumes the prefix has been stripped and therefore only works on
* filtered submaps like in the two examples above.
*/
public Builder withAsyncThreadPoolRegistry(final AsyncThreadPoolRegistry registry) {
configs.put(mainConsumerPrefix(INTERNAL_ASYNC_THREAD_POOL_REGISTRY_CONFIG), registry);
return this;
}

public Builder withControllerSignals(final ControllerSignals controllerSignals) {
configs.put(mainConsumerPrefix(INTERNAL_CONTROLLER_SIGNALS_CONFIG), controllerSignals);
return this;
}

// ----- Responsive elements ----- //

public Builder withMetrics(final ResponsiveMetrics metrics) {
configs.put(INTERNAL_METRICS_CONFIG, metrics);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand Down Expand Up @@ -57,7 +58,7 @@ class ResponsiveConsumerTest {
@BeforeEach
public void setup() {
consumer = new ResponsiveConsumer<>(
"clientid", wrapped, List.of(listener1, listener2), () -> {});
"clientid", wrapped, new AtomicBoolean(), List.of(listener1, listener2), () -> {});
}

@Test
Expand Down
Loading