From b02f2e6723061ad61ea68b7eab5b89d81e631c49 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 6 Jun 2024 22:49:38 -0700 Subject: [PATCH] gate --- .../kafka/api/ResponsiveKafkaStreams.java | 32 ++++++++++--- .../async/AsyncFixedKeyProcessorSupplier.java | 18 +++++-- .../api/async/AsyncProcessorSupplier.java | 15 ++++-- .../async/internals/AsyncProcessingGate.java | 47 +++++++++++++++++++ 4 files changed, 98 insertions(+), 14 deletions(-) create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessingGate.java diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 59c08a990..e13d2d093 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -31,6 +31,7 @@ import static org.apache.kafka.streams.StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; +import dev.responsive.kafka.api.async.internals.AsyncProcessingGate; import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry; import dev.responsive.kafka.api.config.CompatibilityMode; import dev.responsive.kafka.api.config.ResponsiveConfig; @@ -213,8 +214,13 @@ protected ResponsiveKafkaStreams(final Params params) { LOG.info("Responsive Client version: {}", versionMetadata.responsiveClientVersion); LOG.info("Responsive Client commit ID: {}", versionMetadata.responsiveClientCommitId); + final String applicationId = applicationConfigs.getString(APPLICATION_ID_CONFIG); + final int asyncThreadPoolSize = params.responsiveConfig.getInt(ASYNC_THREAD_POOL_SIZE_CONFIG); + + AsyncProcessingGate.maybeEnableAsyncProcessing(asyncThreadPoolSize); + responsiveMetrics.initializeTags( - applicationConfigs.getString(APPLICATION_ID_CONFIG), + applicationId, clientId, versionMetadata, applicationConfigs.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX) @@ -368,27 +374,39 @@ public StateRestoreListener stateRestoreListener() { } private void closeInternal() { + AsyncProcessingGate.closeAsyncProcessing(); responsiveStateListener.close(); sessionClients.closeAll(); } @Override public void close() { - super.close(); - closeInternal(); + try { + super.close(); + } finally { + closeInternal(); + } } @Override public boolean close(final Duration timeout) { - final boolean closed = super.close(timeout); - closeInternal(); + final boolean closed; + try { + closed = super.close(timeout); + } finally { + closeInternal(); + } return closed; } @Override public boolean close(final CloseOptions options) { - final boolean closed = super.close(options); - closeInternal(); + final boolean closed; + try { + closed = super.close(options); + } finally { + closeInternal(); + } return closed; } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java index 63a212204..76243c21a 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java @@ -19,7 +19,7 @@ import static dev.responsive.kafka.api.async.internals.AsyncProcessor.createAsyncFixedKeyProcessor; import static dev.responsive.kafka.api.async.internals.AsyncUtils.initializeAsyncBuilders; -import dev.responsive.kafka.api.async.internals.AsyncProcessor; +import dev.responsive.kafka.api.async.internals.AsyncProcessingGate; import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; import java.util.HashSet; import java.util.Map; @@ -75,12 +75,22 @@ private AsyncFixedKeyProcessorSupplier( } @Override - public AsyncProcessor get() { - return createAsyncFixedKeyProcessor(userProcessorSupplier.get(), asyncStoreBuilders); + public FixedKeyProcessor get() { + if (AsyncProcessingGate.asyncEnabled()) { + return createAsyncFixedKeyProcessor(userProcessorSupplier.get(), asyncStoreBuilders); + } else { + return userProcessorSupplier.get(); + } } @Override public Set> stores() { - return new HashSet<>(asyncStoreBuilders.values()); + if (AsyncProcessingGate.asyncEnabled()) { + return new HashSet<>(asyncStoreBuilders.values()); + } else { + return userProcessorSupplier.stores(); + } } + + } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java index ddcb30e4b..e4bc6c020 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java @@ -19,6 +19,7 @@ import static dev.responsive.kafka.api.async.internals.AsyncProcessor.createAsyncProcessor; import static dev.responsive.kafka.api.async.internals.AsyncUtils.initializeAsyncBuilders; +import dev.responsive.kafka.api.async.internals.AsyncProcessingGate; import dev.responsive.kafka.api.async.internals.AsyncProcessor; import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; import java.util.HashSet; @@ -155,13 +156,21 @@ private AsyncProcessorSupplier( } @Override - public AsyncProcessor get() { - return createAsyncProcessor(userProcessorSupplier.get(), asyncStoreBuilders); + public Processor get() { + if (AsyncProcessingGate.asyncEnabled()) { + return createAsyncProcessor(userProcessorSupplier.get(), asyncStoreBuilders); + } else { + return userProcessorSupplier.get(); + } } @Override public Set> stores() { - return new HashSet<>(asyncStoreBuilders.values()); + if (AsyncProcessingGate.asyncEnabled()) { + return new HashSet<>(asyncStoreBuilders.values()); + } else { + return userProcessorSupplier.stores(); + } } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessingGate.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessingGate.java new file mode 100644 index 000000000..24697a73b --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessingGate.java @@ -0,0 +1,47 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.api.async.internals; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Super hacky way to communicate whether async processing is enabled based on the configured + * async thread pool size. + * We have to use a static map to get this information from the ResponsiveConfig passed into + * ResponsiveKafkaStreams to the async processor supplier *before* any state stores are built. + * Unfortunately Kafka Streams does not pass in the app configs to any processor, supplier, or + * state store constructors -- configs only become available to these elements when #init is called. + */ +public final class AsyncProcessingGate { + + private static final AtomicBoolean ASYNC_ENABLED = new AtomicBoolean(false); + + public static void maybeEnableAsyncProcessing(final int numAsyncThreads) { + if (numAsyncThreads > 0) { + ASYNC_ENABLED.set(true); + } + } + + public static void closeAsyncProcessing() { + ASYNC_ENABLED.set(false); + } + + public static boolean asyncEnabled() { + return ASYNC_ENABLED.get(); + } + +}