diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 405b706bf..4d0b20336 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -36,6 +36,8 @@ import dev.responsive.kafka.api.config.StorageBackend; import dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers; import dev.responsive.kafka.internal.clients.AsyncStreamsKafkaClientSupplier; +import dev.responsive.kafka.internal.clients.OriginEventReporter; +import dev.responsive.kafka.internal.clients.OriginEventReporterImpl; import dev.responsive.kafka.internal.clients.ResponsiveKafkaClientSupplier; import dev.responsive.kafka.internal.config.ConfigUtils; import dev.responsive.kafka.internal.config.InternalSessionConfigs; @@ -46,6 +48,10 @@ import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient; import dev.responsive.kafka.internal.db.rs3.RS3TableFactory; import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client; +import dev.responsive.kafka.internal.license.model.CloudLicenseV1; +import dev.responsive.kafka.internal.license.model.LicenseInfo; +import dev.responsive.kafka.internal.license.model.TimedTrialV1; +import dev.responsive.kafka.internal.license.model.UsageBasedV1; import dev.responsive.kafka.internal.metrics.ClientVersionMetadata; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener; @@ -94,6 +100,7 @@ public class ResponsiveKafkaStreams extends KafkaStreams { private final ResponsiveStateListener responsiveStateListener; private final ResponsiveRestoreListener responsiveRestoreListener; private final SessionClients sessionClients; + private final OriginEventReporter originEventReporter; /** * Create a {@code ResponsiveKafkaStreams} instance. @@ -200,10 +207,9 @@ protected ResponsiveKafkaStreams(final Params params) { params.time ); - loadLicense(params.responsiveConfig); - this.responsiveMetrics = params.metrics; this.sessionClients = params.sessionClients; + this.originEventReporter = params.oeReporter; final ClientVersionMetadata versionMetadata = ClientVersionMetadata.loadVersionMetadata(); // Only log the version metadata for Responsive since Kafka Streams will log its own @@ -369,10 +375,17 @@ public StateRestoreListener stateRestoreListener() { } private void closeInternal() { + originEventReporter.close(); responsiveStateListener.close(); sessionClients.closeAll(); } + @Override + public synchronized void start() throws IllegalStateException, StreamsException { + originEventReporter.start(); + super.start(); + } + @Override public void close() { super.close(); @@ -411,6 +424,7 @@ protected static class Params { private SessionClients sessionClients; private Optional asyncThreadPoolRegistry; private ResponsiveKafkaClientSupplier responsiveKafkaClientSupplier; + private OriginEventReporter oeReporter; public Params(final Topology topology, final Map configs) { this.topology = topology; @@ -443,6 +457,7 @@ public Params withTime(final Time time) { // that it's impossible to use a Params instance that hasn't called build(), // but that felt a little extra public Params build() { + var license = loadLicense(responsiveConfig); this.asyncThreadPoolRegistry = AsyncUtils.configuredAsyncThreadPool( responsiveConfig, streamsConfig.getInt(NUM_STREAM_THREADS_CONFIG), @@ -454,12 +469,14 @@ public Params build() { asyncThreadPoolRegistry.get() ) : innerClientSupplier; + this.oeReporter = reporter(responsiveConfig, license); this.responsiveKafkaClientSupplier = new ResponsiveKafkaClientSupplier( delegateKafkaClientSupplier, responsiveConfig, streamsConfig, storeRegistry, metrics, + oeReporter, storageBackend ); @@ -544,5 +561,23 @@ public Params build() { return this; } + + private static OriginEventReporter reporter( + final ResponsiveConfig responsiveConfig, + final LicenseInfo license + ) { + switch (license.type()) { + case CloudLicenseV1.TYPE_NAME: + case TimedTrialV1.TYPE_NAME: + // don't report counts for cloud/timed trial licenses + return (tp, count) -> { }; + case UsageBasedV1.TYPE_NAME: + default: + return new OriginEventReporterImpl( + responsiveConfig, + license + ); + } + } } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/NoopOriginEventRecorder.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/NoopOriginEventRecorder.java deleted file mode 100644 index 8337de5a5..000000000 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/NoopOriginEventRecorder.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2024 Responsive Computing, Inc. - * - * This source code is licensed under the Responsive Business Source License Agreement v1.0 - * available at: - * - * https://www.responsive.dev/legal/responsive-bsl-10 - * - * This software requires a valid Commercial License Key for production use. Trial and commercial - * licenses can be obtained at https://www.responsive.dev - */ - -package dev.responsive.kafka.internal.clients; - -import java.util.Collection; -import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; - -public class NoopOriginEventRecorder implements OriginEventRecorder { - @Override - public ConsumerRecords onPoll(final ConsumerRecords records) { - return records; - } - - @Override - public ProducerRecord onSend(final ProducerRecord record) { - return record; - } - - @Override - public void onConsumerCommit(final Map offsets) { - } - - @Override - public void onProducerCommit() { - } - - @Override - public void onSendOffsetsToTransaction( - final Map offsets, - final String consumerGroupId - ) { - } - - @Override - public void onPartitionsLost(final Collection partitions) { - } - - @Override - public void onUnsubscribe() { - } -} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventRecorder.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventRecorder.java deleted file mode 100644 index b34a12d0a..000000000 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventRecorder.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2024 Responsive Computing, Inc. - * - * This source code is licensed under the Responsive Business Source License Agreement v1.0 - * available at: - * - * https://www.responsive.dev/legal/responsive-bsl-10 - * - * This software requires a valid Commercial License Key for production use. Trial and commercial - * licenses can be obtained at https://www.responsive.dev - */ - -package dev.responsive.kafka.internal.clients; - -import java.util.Collection; -import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; - -public interface OriginEventRecorder - extends ResponsiveConsumer.Listener, ResponsiveProducer.Listener { - - @Override - ConsumerRecords onPoll(ConsumerRecords records); - - @Override - ProducerRecord onSend(ProducerRecord record); - - @Override - void onConsumerCommit(Map offsets); - - @Override - void onProducerCommit(); - - @Override - void onSendOffsetsToTransaction( - final Map offsets, - final String consumerGroupId - ); - - @Override - void onPartitionsLost(Collection partitions); - - @Override - void onUnsubscribe(); -} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventRecorderImpl.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventRecorderImpl.java deleted file mode 100644 index 5346698d4..000000000 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventRecorderImpl.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Copyright 2024 Responsive Computing, Inc. - * - * This source code is licensed under the Responsive Business Source License Agreement v1.0 - * available at: - * - * https://www.responsive.dev/legal/responsive-bsl-10 - * - * This software requires a valid Commercial License Key for production use. Trial and commercial - * licenses can be obtained at https://www.responsive.dev - */ - -package dev.responsive.kafka.internal.clients; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; -import com.google.errorprone.annotations.concurrent.GuardedBy; -import dev.responsive.kafka.api.config.ResponsiveConfig; -import dev.responsive.kafka.internal.license.model.LicenseInfo; -import dev.responsive.kafka.internal.license.model.UsageBasedV1; -import dev.responsive.kafka.internal.license.server.model.OriginEventsReportRequestV1; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.time.Duration; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.function.BiConsumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.streams.StreamsConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class OriginEventRecorderImpl implements OriginEventRecorder { - - private static final Logger LOG = LoggerFactory.getLogger(OriginEventRecorderImpl.class); - - static final String ORIGIN_EVENT_HEADER_KEY = "_roe"; - static final byte[] ORIGIN_EVENT_MARK = new byte[] {0x1}; - - private final Duration reportInterval; - private final BiConsumer, String> reportOriginEvents; - private final String threadId; - private final boolean eos; - - @GuardedBy("this") - private final Map trackedOffsets = new HashMap<>(); - @GuardedBy("this") - private final Map nextCommitOffsets = new HashMap<>(); - @GuardedBy("this") - private final Map ongoingReport = new HashMap<>(); - @GuardedBy("this") - private long lastReportTs; - - public OriginEventRecorderImpl( - final String threadId, - final ResponsiveConfig config, - final LicenseInfo license, - final boolean eos - ) { - this( - threadId, - new Reporter(config, license), - config.getLong(ResponsiveConfig.ORIGIN_EVENT_REPORT_INTERVAL_MS_CONFIG), - eos - ); - } - - @VisibleForTesting - OriginEventRecorderImpl( - final String threadId, - final BiConsumer, String> reportOriginEvents, - final boolean eos - ) { - this(threadId, reportOriginEvents, 0L, eos); - } - - private OriginEventRecorderImpl( - final String threadId, - final BiConsumer, String> reportOriginEvents, - final long reportIntervalMs, - final boolean eos - ) { - this.threadId = threadId; - this.reportOriginEvents = reportOriginEvents; - this.reportInterval = Duration.ofMillis(reportIntervalMs); - this.lastReportTs = System.currentTimeMillis(); - this.eos = eos; - } - - @Override - public ConsumerRecords onPoll(final ConsumerRecords records) { - for (final ConsumerRecord record : records) { - final var header = record.headers().lastHeader(ORIGIN_EVENT_HEADER_KEY); - if (header == null) { - synchronized (this) { - final var tracker = trackedOffsets.computeIfAbsent( - new TopicPartition(record.topic(), record.partition()), - tp -> new OffsetTracker(record.offset()) - ); - tracker.mark(record.offset()); - } - } - } - return records; - } - - @Override - public ProducerRecord onSend(final ProducerRecord record) { - final var header = record.headers().lastHeader(ORIGIN_EVENT_HEADER_KEY); - if (header == null) { - record.headers().add(ORIGIN_EVENT_HEADER_KEY, ORIGIN_EVENT_MARK); - } - return record; - } - - @Override - public void onConsumerCommit(final Map offsets) { - if (eos) { - throw new IllegalStateException("Should not use consumer commit on EOS"); - } - - synchronized (this) { - nextCommitOffsets.putAll(offsets); - } - onCommit(); - } - - @Override - public void onProducerCommit() { - if (!eos) { - throw new IllegalStateException("Should not use producer commit on ALOS"); - } - - onCommit(); - } - - @Override - public synchronized void onSendOffsetsToTransaction( - final Map offsets, - final String consumerGroupId - ) { - nextCommitOffsets.putAll(offsets); - } - - private void onCommit() { - Map report = null; - - synchronized (this) { - for (final var tpOffset : nextCommitOffsets.entrySet()) { - final var tracker = trackedOffsets.get(tpOffset.getKey()); - if (tracker != null) { - final int count = tracker.countAndShift(tpOffset.getValue().offset()); - ongoingReport.compute( - tpOffset.getKey(), - (k, oldCount) -> oldCount == null ? count : oldCount + count - ); - } - } - - nextCommitOffsets.clear(); - final long now = System.currentTimeMillis(); - if (Duration.ofMillis(now - lastReportTs).compareTo(reportInterval) >= 0) { - report = Map.copyOf(ongoingReport); - lastReportTs = now; - ongoingReport.clear(); - } - } - - // do this outside of the synchronized block since the report may - // involve a network call - if (report != null) { - reportOriginEvents.accept(report, threadId); - } - } - - @Override - public synchronized void onPartitionsLost(final Collection partitions) { - // since we won't get commits for these anymore, we should clear the offsets - // we have seen so far but not committed in case we ever get the topic partition - // reassigned back to ourselves - partitions.forEach(trackedOffsets::remove); - } - - @Override - public synchronized void onUnsubscribe() { - // clear all seen tracked offsets that have not yet been committed - trackedOffsets.clear(); - } - - private static class Reporter implements BiConsumer, String> { - - private final ObjectMapper mapper = new ObjectMapper(); - private final HttpClient client; - private final String licenseServer; - private final String apiKey; - private final String appId; - private final String env; - - private Reporter(ResponsiveConfig config, final LicenseInfo license) { - if (!license.type().equals(UsageBasedV1.TYPE_NAME)) { - throw new IllegalArgumentException( - "Invalid license type for usage reporting: " + license.type()); - } - - this.licenseServer = config.getString(ResponsiveConfig.RESPONSIVE_LICENSE_SERVER_CONFIG); - this.apiKey = ((UsageBasedV1) license).key(); - this.appId = (String) config.originals().get(StreamsConfig.APPLICATION_ID_CONFIG); - this.env = config.getString(ResponsiveConfig.RESPONSIVE_ENV_CONFIG); - - client = HttpClient.newHttpClient(); - } - - @Override - public void accept(final Map counts, final String threadId) { - LOG.debug("Reporting origin events for {} on {}", counts, threadId); - - final var body = OriginEventsReportRequestV1.builder() - .setTimestamp(System.currentTimeMillis()) - .setTransactionId(UUID.randomUUID().toString()) - .setEventCount(counts.values().stream().reduce(Integer::sum).orElse(0)) - .setEnv(env) - .setApplicationId(appId) - .setThreadId(threadId) - .build(); - - try { - final HttpRequest request = HttpRequest.newBuilder() - .version(HttpClient.Version.HTTP_1_1) - .uri(new URI(licenseServer + "/v1/usage")) - .header("Content-Type", "application/json") - .header("Authorization", "Bearer " + apiKey) - .POST(HttpRequest.BodyPublishers.ofString(mapper.writeValueAsString(body))) - .build(); - - client.sendAsync(request, HttpResponse.BodyHandlers.ofString()) - .thenAccept(response -> { - final int status = response.statusCode(); - final String responseBody = response.body(); - if (status >= 400 && status < 500) { - LOG.error( - "Failed to report usage due to {} (error code {}). " - + "Please make sure your license is correctly configured to " - + "avoid violation of the license agreement terms.", - responseBody, - status - ); - } else if (status >= 500 && status < 600) { - LOG.warn("Failed to report usage due to {} (code {}).", status, responseBody); - } else { - LOG.debug("Successfully reported usage (status: {}): {}", status, responseBody); - } - }) - .exceptionally(e -> { - LOG.error("Failed to report usage! Please make sure " - + "you have correctly configured your Responsive License", e); - return null; - }); - } catch (final JsonProcessingException e) { - // this should never happen - LOG.warn("Internal error while reporting metrics to the License Server", e); - } catch (final URISyntaxException e) { - throw new IllegalArgumentException( - "Invalid configuration for " + ResponsiveConfig.RESPONSIVE_LICENSE_SERVER_CONFIG + ": " - + licenseServer, e - ); - } - } - } -} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventReporter.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventReporter.java new file mode 100644 index 000000000..2d901b74b --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventReporter.java @@ -0,0 +1,25 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.clients; + +import org.apache.kafka.common.TopicPartition; + +public interface OriginEventReporter { + + void report(final TopicPartition tp, final Integer counts); + + default void start() {} + + default void close() {} + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventReporterImpl.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventReporterImpl.java new file mode 100644 index 000000000..851a03dff --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventReporterImpl.java @@ -0,0 +1,174 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.clients; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.AbstractScheduledService; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.internal.license.model.LicenseInfo; +import dev.responsive.kafka.internal.license.model.UsageBasedV1; +import dev.responsive.kafka.internal.license.server.model.OriginEventsReportRequestV1; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OriginEventReporterImpl extends AbstractScheduledService + implements OriginEventReporter { + + private static final Logger LOG = LoggerFactory.getLogger(OriginEventReporterImpl.class); + + private final ObjectMapper mapper = new ObjectMapper(); + private final HttpClient client; + private final String licenseServer; + private final String apiKey; + private final String appId; + private final String env; + private final long reportInterval; + + @GuardedBy("this") + private final Map currentReport; + private CompletableFuture inFlight; + + public OriginEventReporterImpl(ResponsiveConfig config, final LicenseInfo license) { + if (!license.type().equals(UsageBasedV1.TYPE_NAME)) { + throw new IllegalArgumentException( + "Invalid license type for usage reporting: " + license.type()); + } + + this.licenseServer = config.getString(ResponsiveConfig.RESPONSIVE_LICENSE_SERVER_CONFIG); + this.apiKey = ((UsageBasedV1) license).key(); + this.appId = (String) config.originals().get(StreamsConfig.APPLICATION_ID_CONFIG); + this.env = config.getString(ResponsiveConfig.RESPONSIVE_ENV_CONFIG); + this.reportInterval = config.getLong(ResponsiveConfig.ORIGIN_EVENT_REPORT_INTERVAL_MS_CONFIG); + + client = HttpClient.newHttpClient(); + currentReport = new HashMap<>(); + } + + @Override + public synchronized void report(final TopicPartition tp, Integer count) { + currentReport.compute(tp, (k, old) -> old == null ? count : old + count); + } + + @Override + protected void runOneIteration() { + final Map report; + synchronized (this) { + // don't bother reporting an empty report + if (currentReport.isEmpty()) { + return; + } + + report = Map.copyOf(currentReport); + currentReport.clear(); + } + + LOG.info("Reporting origin events for {}", report); + + final var body = OriginEventsReportRequestV1.builder() + .setTimestamp(System.currentTimeMillis()) + .setTransactionId(UUID.randomUUID().toString()) + .setEventCount(report.values().stream().reduce(Integer::sum).orElse(0)) + .setEnv(env) + .setApplicationId(appId) + .build(); + + try { + final HttpRequest request = HttpRequest.newBuilder() + .version(HttpClient.Version.HTTP_1_1) + .uri(new URI(licenseServer + "/v1/usage")) + .header("Content-Type", "application/json") + .header("Authorization", "Bearer " + apiKey) + .POST(HttpRequest.BodyPublishers.ofString(mapper.writeValueAsString(body))) + .build(); + + this.inFlight = client.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .thenAccept(response -> { + final int status = response.statusCode(); + final String responseBody = response.body(); + if (status >= 400 && status < 500) { + LOG.error( + "Failed to report usage due to {} (error code {}). " + + "Please make sure your license is correctly configured to " + + "avoid violation of the license agreement terms.", + responseBody, + status + ); + } else if (status >= 500 && status < 600) { + LOG.warn("Failed to report usage due to {} (code {}).", status, responseBody); + } else { + LOG.debug("Successfully reported usage (status: {}): {}", status, responseBody); + } + }) + .exceptionally(e -> { + LOG.error("Failed to report usage! Please make sure " + + "you have correctly configured your Responsive License", e); + return null; + }); + } catch (final JsonProcessingException e) { + // this should never happen + LOG.warn("Internal error while reporting metrics to the License Server", e); + } catch (final URISyntaxException e) { + throw new IllegalArgumentException( + "Invalid configuration for " + ResponsiveConfig.RESPONSIVE_LICENSE_SERVER_CONFIG + ": " + + licenseServer, e + ); + } + } + + @Override + protected Scheduler scheduler() { + return Scheduler.newFixedRateSchedule( + Duration.ofMillis(reportInterval), + Duration.ofMillis(reportInterval) + ); + } + + @Override + public void start() { + startAsync().awaitRunning(); + } + + @Override + public void close() { + // if there's any data that has not been reported, report it now + // and await the report -- this also helps ensure end-to-end testing + // is accurate + runOneIteration(); + if (inFlight != null) { + try { + // if server doesn't respond in time just drop the usage + // rather the hold up shutdown; this is only really important + // for testing situations + inFlight.get(10, TimeUnit.SECONDS); + } catch (final Exception e) { + LOG.warn("Could not await final origin event report.", e); + } + } + stopAsync().awaitTerminated(); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventTracker.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventTracker.java new file mode 100644 index 000000000..1a3d41352 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventTracker.java @@ -0,0 +1,130 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.clients; + +import com.google.errorprone.annotations.concurrent.GuardedBy; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; + +public class OriginEventTracker + implements ResponsiveConsumer.Listener, ResponsiveProducer.Listener { + + static final String ORIGIN_EVENT_HEADER_KEY = "_roe"; + static final byte[] ORIGIN_EVENT_MARK = new byte[] {0x1}; + + private final OriginEventReporter reporter; + private final boolean eos; + + @GuardedBy("this") + private final Map trackedOffsets = new HashMap<>(); + @GuardedBy("this") + private final Map nextCommitOffsets = new HashMap<>(); + + public OriginEventTracker( + final OriginEventReporter reporter, + final boolean eos + ) { + this.reporter = reporter; + this.eos = eos; + } + + @Override + public ConsumerRecords onPoll(final ConsumerRecords records) { + for (final ConsumerRecord record : records) { + final var header = record.headers().lastHeader(ORIGIN_EVENT_HEADER_KEY); + if (header == null) { + synchronized (this) { + final var tracker = trackedOffsets.computeIfAbsent( + new TopicPartition(record.topic(), record.partition()), + tp -> new OffsetTracker(record.offset()) + ); + tracker.mark(record.offset()); + } + } + } + return records; + } + + @Override + public ProducerRecord onSend(final ProducerRecord record) { + final var header = record.headers().lastHeader(ORIGIN_EVENT_HEADER_KEY); + if (header == null) { + record.headers().add(ORIGIN_EVENT_HEADER_KEY, ORIGIN_EVENT_MARK); + } + return record; + } + + @Override + public void onConsumerCommit(final Map offsets) { + if (eos) { + throw new IllegalStateException("Should not use consumer commit on EOS"); + } + + synchronized (this) { + nextCommitOffsets.putAll(offsets); + } + onCommit(); + } + + @Override + public void onProducerCommit() { + if (!eos) { + throw new IllegalStateException("Should not use producer commit on ALOS"); + } + + onCommit(); + } + + @Override + public synchronized void onSendOffsetsToTransaction( + final Map offsets, + final String consumerGroupId + ) { + nextCommitOffsets.putAll(offsets); + } + + private void onCommit() { + synchronized (this) { + for (final var tpOffset : nextCommitOffsets.entrySet()) { + final var tracker = trackedOffsets.get(tpOffset.getKey()); + if (tracker != null) { + final int count = tracker.countAndShift(tpOffset.getValue().offset()); + reporter.report(tpOffset.getKey(), count); + } + } + + nextCommitOffsets.clear(); + } + } + + @Override + public synchronized void onPartitionsLost(final Collection partitions) { + // since we won't get commits for these anymore, we should clear the offsets + // we have seen so far but not committed in case we ever get the topic partition + // reassigned back to ourselves + partitions.forEach(trackedOffsets::remove); + } + + @Override + public synchronized void onUnsubscribe() { + // clear all seen tracked offsets that have not yet been committed + trackedOffsets.clear(); + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java index e7b4b24dc..8c944ac8e 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java @@ -19,10 +19,6 @@ import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.config.StorageBackend; -import dev.responsive.kafka.internal.license.LicenseUtils; -import dev.responsive.kafka.internal.license.model.CloudLicenseV1; -import dev.responsive.kafka.internal.license.model.TimedTrialV1; -import dev.responsive.kafka.internal.license.model.UsageBasedV1; import dev.responsive.kafka.internal.metrics.EndOffsetsPoller; import dev.responsive.kafka.internal.metrics.MetricPublishingCommitListener; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; @@ -66,6 +62,7 @@ public final class ResponsiveKafkaClientSupplier implements KafkaClientSupplier private final ResponsiveStoreRegistry storeRegistry; private final Factories factories; private final ResponsiveMetrics metrics; + private final OriginEventReporter oeReporter; private final EndOffsetsPoller endOffsetsPoller; private final String applicationId; private final boolean eos; @@ -78,15 +75,18 @@ public ResponsiveKafkaClientSupplier( final StreamsConfig configs, final ResponsiveStoreRegistry storeRegistry, final ResponsiveMetrics metrics, + final OriginEventReporter oeReporter, final StorageBackend storageBackend ) { this( - configuredFactories(responsiveConfig, configs), + new Factories() { + }, clientSupplier, configs, storeRegistry, metrics, storageBackend, + oeReporter, responsiveConfig.getBoolean(ResponsiveConfig.RESTORE_OFFSET_REPAIR_ENABLED_CONFIG) ); } @@ -98,6 +98,7 @@ public ResponsiveKafkaClientSupplier( final ResponsiveStoreRegistry storeRegistry, final ResponsiveMetrics metrics, final StorageBackend storageBackend, + final OriginEventReporter oeReporter, final boolean repairRestoreOffsetOutOfRange ) { this.factories = factories; @@ -105,6 +106,7 @@ public ResponsiveKafkaClientSupplier( this.storeRegistry = storeRegistry; this.metrics = metrics; this.storageBackend = storageBackend; + this.oeReporter = oeReporter; this.repairRestoreOffsetOutOfRange = repairRestoreOffsetOutOfRange; eos = eosEnabled(configs); @@ -131,6 +133,7 @@ public Producer getProducer(final Map config) { eos, threadId, metrics, + oeReporter, applicationId, config, endOffsetsPoller, @@ -143,7 +146,7 @@ public Producer getProducer(final Map config) { Collections.unmodifiableList( Arrays.asList( tc.offsetRecorder.getProducerListener(), - tc.originEventRecorder, + tc.originEventTracker, new CloseListener(threadId) ) ) @@ -161,6 +164,7 @@ public Consumer getConsumer(final Map config) { eos, threadId, metrics, + oeReporter, applicationId, config, endOffsetsPoller, @@ -175,7 +179,7 @@ public Consumer getConsumer(final Map config) { tc.committedOffsetMetricListener, tc.offsetRecorder.getConsumerListener(), tc.endOffsetsPollerListener, - tc.originEventRecorder, + tc.originEventTracker, new CloseListener(threadId) ) ); @@ -244,6 +248,7 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread( final boolean eos, final String threadId, final ResponsiveMetrics metrics, + final OriginEventReporter oeReporter, final String consumerGroup, final Map configs, final EndOffsetsPoller endOffsetsPoller, @@ -256,7 +261,7 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread( return tl.getVal(); } final var offsetRecorder = factories.createOffsetRecorder(eos, threadId); - final var originEventRecorder = factories.createOriginEventRecorder(threadId); + final var originEventRecorder = factories.createOriginEventRecorder(oeReporter, eos); final var tl = new ReferenceCounted<>( String.format("ListenersForThread(%s)", threadId), new ListenersForThread( @@ -300,7 +305,7 @@ private static class ListenersForThread implements Closeable { final MetricPublishingCommitListener committedOffsetMetricListener; final StoreCommitListener storeCommitListener; final EndOffsetsPoller.Listener endOffsetsPollerListener; - final OriginEventRecorder originEventRecorder; + final OriginEventTracker originEventTracker; public ListenersForThread( final String threadId, @@ -308,14 +313,14 @@ public ListenersForThread( final MetricPublishingCommitListener committedOffsetMetricListener, final StoreCommitListener storeCommitListener, final EndOffsetsPoller.Listener endOffsetsPollerListener, - final OriginEventRecorder originEventRecorder + final OriginEventTracker originEventTracker ) { this.threadId = threadId; this.offsetRecorder = offsetRecorder; this.committedOffsetMetricListener = committedOffsetMetricListener; this.storeCommitListener = storeCommitListener; this.endOffsetsPollerListener = endOffsetsPollerListener; - this.originEventRecorder = originEventRecorder; + this.originEventTracker = originEventTracker; } @Override @@ -361,32 +366,6 @@ private T getVal() { } } - private static Factories configuredFactories( - final ResponsiveConfig responsiveConfig, - final StreamsConfig streamsConfig - ) { - return threadId -> { - // we should consider caching the license in ResponsiveConfig so that we - // don't have to load and authenticate it more than once - final var license = LicenseUtils.loadLicense(responsiveConfig); - switch (license.type()) { - case CloudLicenseV1.TYPE_NAME: - case TimedTrialV1.TYPE_NAME: - // we don't report origin events for timed trial - // or cloud licenses since we don't want to modify - // the headers of records if not necessary - return new NoopOriginEventRecorder(); - case UsageBasedV1.TYPE_NAME: - default: - return new OriginEventRecorderImpl( - threadId, - responsiveConfig, - license, - eosEnabled(streamsConfig) - ); - } - }; - } interface Factories { default EndOffsetsPoller createEndOffsetPoller( @@ -455,6 +434,11 @@ default ResponsiveRestoreConsumer createRestoreConsumer( ); } - OriginEventRecorder createOriginEventRecorder(final String threadId); + default OriginEventTracker createOriginEventRecorder( + final OriginEventReporter reporter, + final boolean eos + ) { + return new OriginEventTracker(reporter, eos); + } } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/server/model/OriginEventsReportRequestV1.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/server/model/OriginEventsReportRequestV1.java index 8914f8f27..3257a5276 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/server/model/OriginEventsReportRequestV1.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/server/model/OriginEventsReportRequestV1.java @@ -36,7 +36,6 @@ public class OriginEventsReportRequestV1 extends LicenseServerRequest { private final String applicationId; // the thread that reported the usage - private final String threadId; public OriginEventsReportRequestV1( @JsonProperty("type") final String type, @@ -44,15 +43,13 @@ public OriginEventsReportRequestV1( @JsonProperty("transactionId") final String transactionId, @JsonProperty("eventCount") final long eventCount, @JsonProperty("env") final String env, - @JsonProperty("applicationId") final String applicationId, - @JsonProperty("threadId") final String threadId + @JsonProperty("applicationId") final String applicationId ) { super(type, timestamp); this.transactionId = transactionId; this.eventCount = eventCount; this.env = env; this.applicationId = applicationId; - this.threadId = threadId; } @JsonProperty("transactionId") @@ -75,11 +72,6 @@ public String applicationId() { return applicationId; } - @JsonProperty("threadId") - public String threadId() { - return threadId; - } - public static Builder builder() { return new Builder(); } @@ -90,7 +82,6 @@ public static class Builder { private long eventCount; private String env; private String applicationId; - private String threadId; public Builder setTimestamp(final long timestamp) { this.timestamp = timestamp; @@ -117,11 +108,6 @@ public Builder setApplicationId(final String applicationId) { return this; } - public Builder setThreadId(final String threadId) { - this.threadId = threadId; - return this; - } - public OriginEventsReportRequestV1 build() { return new OriginEventsReportRequestV1( TYPE_NAME, @@ -129,8 +115,7 @@ public OriginEventsReportRequestV1 build() { transactionId, eventCount, env, - applicationId, - threadId + applicationId ); } } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/OriginEventIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/OriginEventIntegrationTest.java index 0575247bd..bffc2dd97 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/OriginEventIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/OriginEventIntegrationTest.java @@ -83,9 +83,6 @@ public void before( this.licenseServer = licenseServer; this.responsiveProps.putAll(responsiveProps); - // report every commit, so we don't potentially miss origin events - this.responsiveProps.put(ResponsiveConfig.ORIGIN_EVENT_REPORT_INTERVAL_MS_CONFIG, 0); - final var result = admin.createTopics( List.of( new NewTopic(inputTopic(), Optional.of(2), Optional.empty()), diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OriginEventRecorderTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OriginEventTrackerTest.java similarity index 79% rename from kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OriginEventRecorderTest.java rename to kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OriginEventTrackerTest.java index aee4cbc73..5b90f3d7d 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OriginEventRecorderTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OriginEventTrackerTest.java @@ -12,12 +12,10 @@ package dev.responsive.kafka.internal.clients; -import static dev.responsive.kafka.internal.clients.OriginEventRecorderImpl.ORIGIN_EVENT_HEADER_KEY; -import static dev.responsive.kafka.internal.clients.OriginEventRecorderImpl.ORIGIN_EVENT_MARK; -import static org.hamcrest.CoreMatchers.hasItem; +import static dev.responsive.kafka.internal.clients.OriginEventTracker.ORIGIN_EVENT_HEADER_KEY; +import static dev.responsive.kafka.internal.clients.OriginEventTracker.ORIGIN_EVENT_MARK; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.hasSize; import java.util.ArrayList; @@ -33,17 +31,17 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -class OriginEventRecorderTest { +class OriginEventTrackerTest { private static final TopicPartition TP = new TopicPartition("topic", 0); - private OriginEventRecorderImpl recorder; + private OriginEventTracker recorder; private TestReporter reporter; @BeforeEach public void setup() { reporter = new TestReporter(); - recorder = new OriginEventRecorderImpl("test-thread", reporter, false); + recorder = new OriginEventTracker(reporter, false); } @Test @@ -62,9 +60,8 @@ public void shouldTrackOffsetAndReportOnConsumerCommit() { assertThat("A report should have been triggered", calls, hasSize(1)); ReportCall call = calls.get(0); - assertThat("Report thread id should be 'test-thread'", call.threadId, is("test-thread")); - assertThat("Report should contain the TopicPartition", call.report.keySet(), hasItem(TP)); - assertThat("Count for the partition should be 1", call.report.get(TP), is(1)); + assertThat("Report should contain the TopicPartition", call.tp, is(TP)); + assertThat("Count for the partition should be 1", call.count, is(1)); } @Test @@ -87,9 +84,8 @@ public void shouldNotTrackOffsetWhenOriginHeaderPresent() { assertThat("A report should have been triggered", calls, hasSize(1)); ReportCall call = calls.get(0); - assertThat("Report thread id should be 'test-thread'", call.threadId, is("test-thread")); - assertThat("Report should contain the TopicPartition", call.report.keySet(), hasItem(TP)); - assertThat("Count for the partition should only count 1", call.report.get(TP), is(1)); + assertThat("Report should contain the TopicPartition", call.tp, is(TP)); + assertThat("Count for the partition should only count 1", call.count, is(1)); } @Test @@ -112,7 +108,7 @@ public void shouldAddOriginHeaderOnSend() { @Test public void shouldReportOnProducerCommitWithEos() { // Given: - recorder = new OriginEventRecorderImpl("test-thread", reporter, true); + recorder = new OriginEventTracker(reporter, true); long offset = 1000L; final ConsumerRecords records = records(record(offset, false)); @@ -126,7 +122,7 @@ public void shouldReportOnProducerCommitWithEos() { List calls = reporter.getCalls(); assertThat("A report should be triggered on producer commit", calls, hasSize(1)); ReportCall call = calls.get(0); - assertThat("Reported count for the partition should be 1", call.report.get(TP), is(1)); + assertThat("Reported count for the partition should be 1", call.count, is(1)); } @Test @@ -146,7 +142,7 @@ public void shouldOnlyIncludeOffsetsUpToCommitOffset() { assertThat("A report should be triggered", calls, hasSize(1)); assertThat( "Reported count for the partition should be 1 (only offset 1000 counted)", - calls.get(0).report.get(TP), + calls.get(0).count, is(1) ); } @@ -169,9 +165,9 @@ public void shouldOnlyReportCommittedTopicPartitions() { List calls = reporter.getCalls(); assertThat("A report should have been triggered", calls, hasSize(1)); ReportCall call = calls.get(0); - assertThat("Report should only contain the committed TopicPartition", - call.report.keySet(), contains(tp1)); - assertThat("Reported count for tp1 should be 1", call.report.get(tp1), is(1)); + assertThat("Report should contain tp1", + call.tp, is(tp1)); + assertThat("Reported count for tp1 should be 1", call.count, is(1)); } @@ -200,23 +196,22 @@ private ConsumerRecords records(final ConsumerRecord... records) } private static class ReportCall { - final Map report; - final String threadId; + final TopicPartition tp; + final Integer count; - ReportCall(Map report, String threadId) { - this.report = report; - this.threadId = threadId; + ReportCall(final TopicPartition tp, final int count) { + this.tp = tp; + this.count = count; } } - private static class TestReporter - implements java.util.function.BiConsumer, String> { + private static class TestReporter implements OriginEventReporter { private final List calls = new ArrayList<>(); @Override - public void accept(Map report, String threadId) { - calls.add(new ReportCall(report, threadId)); + public void report(TopicPartition tp, Integer count) { + calls.add(new ReportCall(tp, count)); } public List getCalls() { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java index 410b10a1a..d9702ddd2 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java @@ -102,6 +102,8 @@ class ResponsiveKafkaClientSupplierTest { private ResponsiveConsumer responsiveConsumer; @Mock private MetricPublishingCommitListener commitMetricListener; + @Mock + private OriginEventReporter oeReporter; @Captor private ArgumentCaptor> producerListenerCaptor; @Captor @@ -110,8 +112,8 @@ class ResponsiveKafkaClientSupplierTest { private ResponsiveKafkaClientSupplier supplier; private final ResponsiveStoreRegistry storeRegistry = new ResponsiveStoreRegistry(); - private final OriginEventRecorder originEventRecorder = - new OriginEventRecorderImpl("thread", (a, b) -> { }, false); + private final OriginEventTracker originEventTracker = + new OriginEventTracker((tp, count) -> { }, false); @BeforeEach @SuppressWarnings("unchecked") @@ -129,7 +131,8 @@ public void setup() { lenient().when(factories.createMetricsPublishingCommitListener(any(), any(), any())) .thenReturn(commitMetricListener); lenient().when(factories.createOffsetRecorder(anyBoolean(), any())).thenReturn(offsetRecorder); - lenient().when(factories.createOriginEventRecorder(any())).thenReturn(originEventRecorder); + lenient().when(factories.createOriginEventRecorder(any(), anyBoolean())) + .thenReturn(originEventTracker); supplier = supplier(CONFIGS, StorageBackend.MONGO_DB); } @@ -240,7 +243,7 @@ public void shouldAddOriginEventListeners() { // then: verify(factories).createResponsiveConsumer( any(), any(), consumerListenerCaptor.capture()); - assertThat(consumerListenerCaptor.getValue(), Matchers.hasItem(originEventRecorder)); + assertThat(consumerListenerCaptor.getValue(), Matchers.hasItem(originEventTracker)); } @Test @@ -304,6 +307,7 @@ private ResponsiveKafkaClientSupplier supplier( storeRegistry, metrics, storageBackend, + oeReporter, false ); } diff --git a/kafka-client/src/test/resources/log4j.properties b/kafka-client/src/test/resources/log4j.properties index 00e87d443..2316d90f9 100644 --- a/kafka-client/src/test/resources/log4j.properties +++ b/kafka-client/src/test/resources/log4j.properties @@ -17,6 +17,5 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.dev.responsive.kafka.api.async=DEBUG -#log4j.logger.dev.responsive.kafka.internal.clients.OriginEventRecorder=DEBUG log4j.logger.org.apache.kafka.streams.state.internals.AsyncKeyValueStoreBuilder=DEBUG log4j.logger.org.apache.kafka.streams.state.internals.AsyncTimestampedKeyValueStoreBuilder=DEBUG