diff --git a/controller-api/src/main/external-protos/opentelemetry-proto b/controller-api/src/main/external-protos/opentelemetry-proto index d9fd878d4..b41217bad 160000 --- a/controller-api/src/main/external-protos/opentelemetry-proto +++ b/controller-api/src/main/external-protos/opentelemetry-proto @@ -1 +1 @@ -Subproject commit d9fd878d46b1970cf72b3d11ea012fff60716aa8 +Subproject commit b41217bad1fb49b65a0442650c496abfadfc2216 diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 7377911fc..405b706bf 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -22,6 +22,7 @@ import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_TLS_ENABLED_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE; import static dev.responsive.kafka.internal.config.ResponsiveStreamsConfig.validateNoStorageStreamsConfig; +import static dev.responsive.kafka.internal.license.LicenseUtils.loadLicense; import static dev.responsive.kafka.internal.metrics.ResponsiveMetrics.RESPONSIVE_METRICS_NAMESPACE; import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; import static org.apache.kafka.streams.StreamsConfig.METRICS_NUM_SAMPLES_CONFIG; @@ -29,7 +30,6 @@ import static org.apache.kafka.streams.StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; -import com.fasterxml.jackson.databind.ObjectMapper; import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry; import dev.responsive.kafka.api.async.internals.AsyncUtils; import dev.responsive.kafka.api.config.ResponsiveConfig; @@ -46,11 +46,6 @@ import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient; import dev.responsive.kafka.internal.db.rs3.RS3TableFactory; import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client; -import dev.responsive.kafka.internal.license.LicenseAuthenticator; -import dev.responsive.kafka.internal.license.LicenseChecker; -import dev.responsive.kafka.internal.license.model.LicenseDocument; -import dev.responsive.kafka.internal.license.model.LicenseInfo; -import dev.responsive.kafka.internal.license.model.SigningKeys; import dev.responsive.kafka.internal.metrics.ClientVersionMetadata; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener; @@ -61,11 +56,7 @@ import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; import dev.responsive.kafka.internal.utils.SessionClients; import dev.responsive.kafka.internal.utils.SessionUtil; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; import java.time.Duration; -import java.util.Base64; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -97,8 +88,6 @@ public class ResponsiveKafkaStreams extends KafkaStreams { - private static final String SIGNING_KEYS_PATH = "/responsive-license-keys/license-keys.json"; - private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaStreams.class); private final ResponsiveMetrics responsiveMetrics; @@ -211,7 +200,7 @@ protected ResponsiveKafkaStreams(final Params params) { params.time ); - validateLicense(params.responsiveConfig); + loadLicense(params.responsiveConfig); this.responsiveMetrics = params.metrics; this.sessionClients = params.sessionClients; @@ -265,57 +254,6 @@ private static ResponsiveMetrics createMetrics( ), exportService); } - private static void validateLicense(final ResponsiveConfig configs) { - if (!configs.getString(ResponsiveConfig.PLATFORM_API_KEY_CONFIG).isEmpty()) { - return; - } - final LicenseDocument licenseDocument = loadLicense(configs); - final SigningKeys signingKeys = loadSigningKeys(); - final LicenseAuthenticator licenseAuthenticator = new LicenseAuthenticator(signingKeys); - final LicenseInfo licenseInfo = licenseAuthenticator.authenticate(licenseDocument); - final LicenseChecker checker = new LicenseChecker(); - checker.checkLicense(licenseInfo); - } - - private static SigningKeys loadSigningKeys() { - try { - return new ObjectMapper().readValue( - ResponsiveKafkaStreams.class.getResource(SIGNING_KEYS_PATH), - SigningKeys.class - ); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - private static LicenseDocument loadLicense(final ResponsiveConfig configs) { - final String license = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG); - final String licenseFile = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG); - if (license.isEmpty() == licenseFile.isEmpty()) { - throw new ConfigException(String.format( - "Must set exactly one of %s or %s", - ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG, - ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG - )); - } - final String licenseB64; - if (!license.isEmpty()) { - licenseB64 = license; - } else { - try { - licenseB64 = Files.readString(new File(licenseFile).toPath()); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - final ObjectMapper mapper = new ObjectMapper(); - try { - return mapper.readValue(Base64.getDecoder().decode(licenseB64), LicenseDocument.class); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - /** * Fill in the props with any overrides and all internal objects shared via the configs * before these get finalized as a {@link StreamsConfig} object diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java index 861a73412..4258fe241 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java @@ -54,6 +54,11 @@ public class ResponsiveConfig extends AbstractConfig { private static final String RESPONSIVE_LICENSE_FILE_DOC = "A path to a file containing your license."; + public static final String RESPONSIVE_LICENSE_SERVER_CONFIG = "responsive.license.server"; + private static final String RESPONSIVE_LICENSE_SERVER_DEFAULT = "https://api.responsive.dev"; + private static final String RESPONSIVE_LICENSE_SERVER_DOC = + "A URI for your Responsive license server."; + /** * @deprecated use the responsive.storage.backend.type config with {@link StorageBackend#NONE} */ @@ -307,15 +312,20 @@ public class ResponsiveConfig extends AbstractConfig { + "lookups but requires more heap memory"; - // ------------------ Misc functional overrides ---------------------- + // ------------------ Misc configs ---------------------- public static final String RESTORE_OFFSET_REPAIR_ENABLED_CONFIG = "responsive.restore.offset.repair.enabled"; - public static final boolean RESTORE_OFFSET_REPAIR_ENABLED_DEFAULT = false; - public static final String RESTORE_OFFSET_REPAIR_ENABLED_DOC = "When set to 'true', " + RESTORE_OFFSET_REPAIR_ENABLED_CONFIG + private static final boolean RESTORE_OFFSET_REPAIR_ENABLED_DEFAULT = false; + private static final String RESTORE_OFFSET_REPAIR_ENABLED_DOC = "When set to 'true', " + RESTORE_OFFSET_REPAIR_ENABLED_CONFIG + " will ignore OffsetOutOfRangeException and instead seek to the earliest available offset. This exception " + "should only happen in situations where there is truncation/retention on the changelog topic and restoring from the latest " + "committed offset in the remote store is no longer possible. Note that in some situations this may cause data " + "loss, use this configuration with caution"; + public static final String ORIGIN_EVENT_REPORT_INTERVAL_MS_CONFIG = "responsive.origin.event.report.interval.ms"; + private static final long ORIGIN_EVENT_REPORT_INTERVAL_MS_DEFAULT = 10_000L; + private static final String ORIGIN_EVENT_REPORT_INTERVAL_MS_DOC = + "How often to report origin event usage information. This should generally not be changed in production environments"; + // ------------------ StreamsConfig overrides ---------------------- // These configuration values are required by Responsive, and a ConfigException will @@ -347,7 +357,7 @@ public class ResponsiveConfig extends AbstractConfig { RESPONSIVE_ENV_DOC ).define( RESPONSIVE_LICENSE_CONFIG, - Type.STRING, + Type.PASSWORD, "", Importance.HIGH, RESPONSIVE_LICENSE_DOC @@ -357,6 +367,12 @@ public class ResponsiveConfig extends AbstractConfig { "", Importance.HIGH, RESPONSIVE_LICENSE_FILE_DOC + ).define( + RESPONSIVE_LICENSE_SERVER_CONFIG, + Type.STRING, + RESPONSIVE_LICENSE_SERVER_DEFAULT, + Importance.LOW, + RESPONSIVE_LICENSE_SERVER_DOC ).define( STORAGE_BACKEND_TYPE_CONFIG, Type.STRING, @@ -635,6 +651,12 @@ public class ResponsiveConfig extends AbstractConfig { atLeast(0), Importance.MEDIUM, RS3_RETRY_TIMEOUT_DOC + ).define( + ORIGIN_EVENT_REPORT_INTERVAL_MS_CONFIG, + Type.LONG, + ORIGIN_EVENT_REPORT_INTERVAL_MS_DEFAULT, + Importance.LOW, + ORIGIN_EVENT_REPORT_INTERVAL_MS_DOC ); /** diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/NoopOriginEventRecorder.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/NoopOriginEventRecorder.java new file mode 100644 index 000000000..8337de5a5 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/NoopOriginEventRecorder.java @@ -0,0 +1,55 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.clients; + +import java.util.Collection; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; + +public class NoopOriginEventRecorder implements OriginEventRecorder { + @Override + public ConsumerRecords onPoll(final ConsumerRecords records) { + return records; + } + + @Override + public ProducerRecord onSend(final ProducerRecord record) { + return record; + } + + @Override + public void onConsumerCommit(final Map offsets) { + } + + @Override + public void onProducerCommit() { + } + + @Override + public void onSendOffsetsToTransaction( + final Map offsets, + final String consumerGroupId + ) { + } + + @Override + public void onPartitionsLost(final Collection partitions) { + } + + @Override + public void onUnsubscribe() { + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OffsetRecorder.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OffsetRecorder.java index ec1666e93..b3283fea2 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OffsetRecorder.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OffsetRecorder.java @@ -121,7 +121,7 @@ void onCommit( private class ConsumerListener implements ResponsiveConsumer.Listener { @Override - public void onCommit(final Map offsets) { + public void onConsumerCommit(final Map offsets) { if (eos) { throw new IllegalStateException("consumer commit is not expected with EOS"); } @@ -151,7 +151,7 @@ public void onSendOffsetsToTransaction(Map of } @Override - public void onCommit() { + public void onProducerCommit() { if (!eos) { throw new IllegalStateException("producer commit is not expected with alos"); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OffsetTracker.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OffsetTracker.java new file mode 100644 index 000000000..63b42be18 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OffsetTracker.java @@ -0,0 +1,83 @@ +/* + * Copyright 2025 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.clients; + +import java.util.BitSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class allows us to efficiently count the number of events + * between two offsets that match a certain condition. This is somewhat + * memory efficient in that we can track 100K offsets with ~1.5K longs + * (64 bits per long), or roughly 12KB. + *

+ * This is necessary for origin event tracking: since we mark records + * on poll but report aligned to commit it is possible that there are + * records that are polled but not included in the committed offsets. + * This class remedies that by allowing us to count the marked events + * up to the committed offset. + *

+ * Additionally, there are some situations that would cause us to poll + * a record multiple times (e.g. a task is reassigned to the same thread + * after an EOS error). This class is idempotent to marking the same + * offsets multiple times. + */ +public class OffsetTracker { + + private static final Logger LOG = LoggerFactory.getLogger(OffsetTracker.class); + + private BitSet offsets = new BitSet(); + private long baseOffset; + + public OffsetTracker(long baseOffset) { + this.baseOffset = baseOffset; + } + + public void mark(final long offset) { + if (offset < baseOffset) { + LOG.error("Invalid offset {} lower than baseOffset {} marked", offset, baseOffset); + throw new IllegalArgumentException( + "Offset " + offset + " cannot be less than baseOffset " + baseOffset); + } + + // assume that we won't be committing more than MAX_INT offsets + // in a single commit... + final int idx = Math.toIntExact(offset - baseOffset); + offsets.set(idx); + } + + public int countAndShift(final long commitOffset) { + if (commitOffset < baseOffset) { + LOG.error("Invalid offset {} lower than baseOffset {} committed", commitOffset, baseOffset); + throw new IllegalArgumentException( + "Commit offset " + commitOffset + " cannot be less than baseOffset " + baseOffset); + } + + final int shift = Math.toIntExact(commitOffset - baseOffset); + final int count = offsets.get(0, shift).cardinality(); + + // shift the bitset so that commitOffset is the new base offset + // for which we track marked entries + if (shift < offsets.length()) { + offsets = offsets.get(shift, offsets.length()); + } else { + // if the shift is beyond the last entry in the bitset, we can + // just create a new empty bitset + offsets = new BitSet(); + } + + baseOffset = commitOffset; + return count; + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventRecorder.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventRecorder.java new file mode 100644 index 000000000..b34a12d0a --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventRecorder.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.clients; + +import java.util.Collection; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; + +public interface OriginEventRecorder + extends ResponsiveConsumer.Listener, ResponsiveProducer.Listener { + + @Override + ConsumerRecords onPoll(ConsumerRecords records); + + @Override + ProducerRecord onSend(ProducerRecord record); + + @Override + void onConsumerCommit(Map offsets); + + @Override + void onProducerCommit(); + + @Override + void onSendOffsetsToTransaction( + final Map offsets, + final String consumerGroupId + ); + + @Override + void onPartitionsLost(Collection partitions); + + @Override + void onUnsubscribe(); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventRecorderImpl.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventRecorderImpl.java new file mode 100644 index 000000000..12419ebc1 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/OriginEventRecorderImpl.java @@ -0,0 +1,278 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.clients; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.internal.license.model.LicenseInfo; +import dev.responsive.kafka.internal.license.model.UsageBasedV1; +import dev.responsive.kafka.internal.license.server.model.OriginEventsReportRequestV1; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.function.BiConsumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OriginEventRecorderImpl implements OriginEventRecorder { + + private static final Logger LOG = LoggerFactory.getLogger(OriginEventRecorderImpl.class); + + static final String ORIGIN_EVENT_HEADER_KEY = "_roe"; + static final byte[] ORIGIN_EVENT_MARK = new byte[] {0x1}; + + private final Duration reportInterval; + private final BiConsumer, String> reportOriginEvents; + private final String threadId; + private final boolean eos; + + @GuardedBy("this") + private final Map trackedOffsets = new HashMap<>(); + @GuardedBy("this") + private final Map nextCommitOffsets = new HashMap<>(); + @GuardedBy("this") + private final Map ongoingReport = new HashMap<>(); + @GuardedBy("this") + private long lastReportTs; + + public OriginEventRecorderImpl( + final String threadId, + final ResponsiveConfig config, + final LicenseInfo license, + final boolean eos + ) { + this( + threadId, + new Reporter(config, license), + config.getLong(ResponsiveConfig.ORIGIN_EVENT_REPORT_INTERVAL_MS_CONFIG), + eos + ); + } + + @VisibleForTesting + OriginEventRecorderImpl( + final String threadId, + final BiConsumer, String> reportOriginEvents, + final boolean eos + ) { + this(threadId, reportOriginEvents, 0L, eos); + } + + private OriginEventRecorderImpl( + final String threadId, + final BiConsumer, String> reportOriginEvents, + final long reportIntervalMs, + final boolean eos + ) { + this.threadId = threadId; + this.reportOriginEvents = reportOriginEvents; + this.reportInterval = Duration.ofMillis(reportIntervalMs); + this.lastReportTs = System.currentTimeMillis(); + this.eos = eos; + } + + @Override + public ConsumerRecords onPoll(final ConsumerRecords records) { + for (final ConsumerRecord record : records) { + final var header = record.headers().lastHeader(ORIGIN_EVENT_HEADER_KEY); + if (header == null) { + synchronized (this) { + final var tracker = trackedOffsets.computeIfAbsent( + new TopicPartition(record.topic(), record.partition()), + tp -> new OffsetTracker(record.offset()) + ); + tracker.mark(record.offset()); + } + } + } + return records; + } + + @Override + public ProducerRecord onSend(final ProducerRecord record) { + final var header = record.headers().lastHeader(ORIGIN_EVENT_HEADER_KEY); + if (header == null) { + record.headers().add(ORIGIN_EVENT_HEADER_KEY, ORIGIN_EVENT_MARK); + } + return record; + } + + @Override + public void onConsumerCommit(final Map offsets) { + if (eos) { + throw new IllegalStateException("Should not use consumer commit on EOS"); + } + + synchronized (this) { + nextCommitOffsets.putAll(offsets); + } + onCommit(); + } + + @Override + public void onProducerCommit() { + if (!eos) { + throw new IllegalStateException("Should not use producer commit on ALOS"); + } + + onCommit(); + } + + @Override + public synchronized void onSendOffsetsToTransaction( + final Map offsets, + final String consumerGroupId + ) { + nextCommitOffsets.putAll(offsets); + } + + private void onCommit() { + Map report = null; + + synchronized (this) { + for (final var tpOffset : nextCommitOffsets.entrySet()) { + final var tracker = trackedOffsets.get(tpOffset.getKey()); + if (tracker != null) { + final int count = tracker.countAndShift(tpOffset.getValue().offset()); + ongoingReport.compute( + tpOffset.getKey(), + (k, oldCount) -> oldCount == null ? count : oldCount + count + ); + } + } + + nextCommitOffsets.clear(); + final long now = System.currentTimeMillis(); + if (Duration.ofMillis(now - lastReportTs).compareTo(reportInterval) >= 0) { + report = Map.copyOf(ongoingReport); + lastReportTs = now; + ongoingReport.clear(); + } + } + + // do this outside of the synchronized block since the report may + // involve a network call + if (report != null) { + reportOriginEvents.accept(report, threadId); + } + } + + @Override + public synchronized void onPartitionsLost(final Collection partitions) { + // since we won't get commits for these anymore, we should clear the offsets + // we have seen so far but not committed in case we ever get the topic partition + // reassigned back to ourselves + partitions.forEach(trackedOffsets::remove); + } + + @Override + public synchronized void onUnsubscribe() { + // clear all seen tracked offsets that have not yet been committed + trackedOffsets.clear(); + } + + private static class Reporter implements BiConsumer, String> { + + private final ObjectMapper mapper = new ObjectMapper(); + private final HttpClient client; + private final String licenseServer; + private final String apiKey; + private final String appId; + private final String env; + + private Reporter(ResponsiveConfig config, final LicenseInfo license) { + if (!license.type().equals(UsageBasedV1.TYPE_NAME)) { + throw new IllegalArgumentException( + "Invalid license type for usage reporting: " + license.type()); + } + + this.licenseServer = config.getString(ResponsiveConfig.RESPONSIVE_LICENSE_SERVER_CONFIG); + this.apiKey = ((UsageBasedV1) license).key(); + this.appId = (String) config.originals().get(StreamsConfig.APPLICATION_ID_CONFIG); + this.env = config.getString(ResponsiveConfig.RESPONSIVE_ENV_CONFIG); + + client = HttpClient.newHttpClient(); + } + + @Override + public void accept(final Map counts, final String threadId) { + LOG.debug("Reporting origin events for {} on {}", counts, threadId); + + final var body = OriginEventsReportRequestV1.builder() + .setTimestamp(System.currentTimeMillis()) + .setTransactionId(UUID.randomUUID().toString()) + .setEventCount(counts.values().stream().reduce(Integer::sum).orElse(0)) + .setEnv(env) + .setApplicationId(appId) + .setThreadId(threadId) + .build(); + + try { + final HttpRequest request = HttpRequest.newBuilder() + .uri(new URI(licenseServer + "/usage")) + .header("Content-Type", "application/json") + .header("Authorization", "Bearer " + apiKey) + .POST(HttpRequest.BodyPublishers.ofString(mapper.writeValueAsString(body))) + .build(); + + client.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .thenAccept(response -> { + final int status = response.statusCode(); + final String responseBody = response.body(); + if (status >= 400 && status < 500) { + LOG.error( + "Failed to report usage due to {} (error code {}). " + + "Please make sure your license is correctly configured to " + + "avoid violation of the license agreement terms.", + responseBody, + status + ); + } else if (status >= 500 && status < 600) { + LOG.warn("Failed to report usage due to {} (code {}).", status, responseBody); + } else { + LOG.debug("Successfully reported usage (status: {}): {}", status, responseBody); + } + }) + .exceptionally(e -> { + LOG.error("Failed to report usage! Please make sure " + + "you have correctly configured your Responsive License", e); + return null; + }); + } catch (final JsonProcessingException e) { + // this should never happen + LOG.warn("Internal error while reporting metrics to the License Server", e); + } catch (final URISyntaxException e) { + throw new IllegalArgumentException( + "Invalid configuration for " + ResponsiveConfig.RESPONSIVE_LICENSE_SERVER_CONFIG + ": " + + licenseServer, e + ); + } + } + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java index a51fc251c..ce9dc14de 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java @@ -21,6 +21,7 @@ import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; @@ -84,6 +85,21 @@ public ResponsiveConsumer( this.listeners = Objects.requireNonNull(listeners); } + @Override + @Deprecated + public ConsumerRecords poll(final long timeout) { + return poll(Duration.ofMillis(timeout)); + } + + @Override + public ConsumerRecords poll(final Duration timeout) { + var records = super.poll(timeout); + for (final Listener listener : listeners) { + records = listener.onPoll(records); + } + return records; + } + @Override public void subscribe(final Collection topics) { throw new IllegalStateException("Unexpected call to subscribe(Collection) on main consumer" @@ -115,13 +131,13 @@ public void unsubscribe() { @Override public void close() { super.close(); - listeners.forEach(l -> ignoreException(l::onClose)); + listeners.forEach(l -> ignoreException(l::onConsumerClose)); } @Override public void close(final Duration timeout) { super.close(timeout); - listeners.forEach(l -> ignoreException(l::onClose)); + listeners.forEach(l -> ignoreException(l::onConsumerClose)); } @Override @@ -137,14 +153,14 @@ public void commitSync(Duration timeout) { @Override public void commitSync(Map offsets) { super.commitSync(offsets); - listeners.forEach(l -> l.onCommit(offsets)); + listeners.forEach(l -> l.onConsumerCommit(offsets)); } @Override public void commitSync(Map offsets, Duration timeout) { super.commitSync(offsets, timeout); - listeners.forEach(l -> l.onCommit(offsets)); + listeners.forEach(l -> l.onConsumerCommit(offsets)); } @Override @@ -176,6 +192,11 @@ private static void ignoreException(final Logger logger, final Runnable r) { } public interface Listener { + + default ConsumerRecords onPoll(ConsumerRecords records) { + return records; + } + default void onPartitionsRevoked(Collection partitions) { } @@ -185,13 +206,13 @@ default void onPartitionsAssigned(Collection partitions) { default void onPartitionsLost(Collection partitions) { } - default void onCommit(final Map offsets) { + default void onConsumerCommit(final Map offsets) { } default void onUnsubscribe() { } - default void onClose() { + default void onConsumerClose() { } } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java index 2a7d2fc9c..e7b4b24dc 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java @@ -19,6 +19,10 @@ import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.config.StorageBackend; +import dev.responsive.kafka.internal.license.LicenseUtils; +import dev.responsive.kafka.internal.license.model.CloudLicenseV1; +import dev.responsive.kafka.internal.license.model.TimedTrialV1; +import dev.responsive.kafka.internal.license.model.UsageBasedV1; import dev.responsive.kafka.internal.metrics.EndOffsetsPoller; import dev.responsive.kafka.internal.metrics.MetricPublishingCommitListener; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; @@ -77,7 +81,7 @@ public ResponsiveKafkaClientSupplier( final StorageBackend storageBackend ) { this( - new Factories() {}, + configuredFactories(responsiveConfig, configs), clientSupplier, configs, storeRegistry, @@ -139,6 +143,7 @@ public Producer getProducer(final Map config) { Collections.unmodifiableList( Arrays.asList( tc.offsetRecorder.getProducerListener(), + tc.originEventRecorder, new CloseListener(threadId) ) ) @@ -170,6 +175,7 @@ public Consumer getConsumer(final Map config) { tc.committedOffsetMetricListener, tc.offsetRecorder.getConsumerListener(), tc.endOffsetsPollerListener, + tc.originEventRecorder, new CloseListener(threadId) ) ); @@ -220,7 +226,12 @@ private CloseListener(final String threadId) { } @Override - public void onClose() { + public void onProducerClose() { + sharedListeners.derefListenersForThread(threadId); + } + + @Override + public void onConsumerClose() { sharedListeners.derefListenersForThread(threadId); } } @@ -245,6 +256,7 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread( return tl.getVal(); } final var offsetRecorder = factories.createOffsetRecorder(eos, threadId); + final var originEventRecorder = factories.createOriginEventRecorder(threadId); final var tl = new ReferenceCounted<>( String.format("ListenersForThread(%s)", threadId), new ListenersForThread( @@ -256,7 +268,8 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread( offsetRecorder ), new StoreCommitListener(storeRegistry, offsetRecorder), - endOffsetsPoller.addForThread(threadId) + endOffsetsPoller.addForThread(threadId), + originEventRecorder ) ); threadListeners.put(threadId, tl); @@ -287,19 +300,22 @@ private static class ListenersForThread implements Closeable { final MetricPublishingCommitListener committedOffsetMetricListener; final StoreCommitListener storeCommitListener; final EndOffsetsPoller.Listener endOffsetsPollerListener; + final OriginEventRecorder originEventRecorder; public ListenersForThread( final String threadId, final OffsetRecorder offsetRecorder, final MetricPublishingCommitListener committedOffsetMetricListener, final StoreCommitListener storeCommitListener, - final EndOffsetsPoller.Listener endOffsetsPollerListener + final EndOffsetsPoller.Listener endOffsetsPollerListener, + final OriginEventRecorder originEventRecorder ) { this.threadId = threadId; this.offsetRecorder = offsetRecorder; this.committedOffsetMetricListener = committedOffsetMetricListener; this.storeCommitListener = storeCommitListener; this.endOffsetsPollerListener = endOffsetsPollerListener; + this.originEventRecorder = originEventRecorder; } @Override @@ -345,6 +361,33 @@ private T getVal() { } } + private static Factories configuredFactories( + final ResponsiveConfig responsiveConfig, + final StreamsConfig streamsConfig + ) { + return threadId -> { + // we should consider caching the license in ResponsiveConfig so that we + // don't have to load and authenticate it more than once + final var license = LicenseUtils.loadLicense(responsiveConfig); + switch (license.type()) { + case CloudLicenseV1.TYPE_NAME: + case TimedTrialV1.TYPE_NAME: + // we don't report origin events for timed trial + // or cloud licenses since we don't want to modify + // the headers of records if not necessary + return new NoopOriginEventRecorder(); + case UsageBasedV1.TYPE_NAME: + default: + return new OriginEventRecorderImpl( + threadId, + responsiveConfig, + license, + eosEnabled(streamsConfig) + ); + } + }; + } + interface Factories { default EndOffsetsPoller createEndOffsetPoller( final Map config, @@ -411,5 +454,7 @@ default ResponsiveRestoreConsumer createRestoreConsumer( repairRestoreOffsetOutOfRange ); } + + OriginEventRecorder createOriginEventRecorder(final String threadId); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java index d7f46793c..88fb80b35 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java @@ -41,7 +41,7 @@ public class ResponsiveProducer implements Producer { private final Logger logger; public interface Listener { - default void onCommit() { + default void onProducerCommit() { } default void onAbort() { @@ -56,7 +56,11 @@ default void onSendOffsetsToTransaction( ) { } - default void onClose() { + default void onProducerClose() { + } + + default ProducerRecord onSend(final ProducerRecord record) { + return record; } } @@ -107,7 +111,7 @@ public void sendOffsetsToTransaction( @Override public void commitTransaction() throws ProducerFencedException { wrapped.commitTransaction(); - listeners.forEach(Listener::onCommit); + listeners.forEach(Listener::onProducerCommit); } @Override @@ -117,12 +121,18 @@ public void abortTransaction() throws ProducerFencedException { } @Override - public Future send(final ProducerRecord record) { + public Future send(ProducerRecord record) { + for (final Listener listener : listeners) { + record = listener.onSend(record); + } return new RecordingFuture(wrapped.send(record), listeners); } @Override - public Future send(final ProducerRecord record, final Callback callback) { + public Future send(ProducerRecord record, final Callback callback) { + for (final Listener listener : listeners) { + record = listener.onSend(record); + } return new RecordingFuture( wrapped.send(record, new RecordingCallback(callback, listeners)), listeners ); @@ -164,7 +174,7 @@ private void closeListeners() { // TODO(rohan): use consistent error behaviour on all callbacks - just throw up for (final var l : listeners) { try { - l.onClose(); + l.onProducerClose(); } catch (final Throwable t) { logger.error("error during producer listener close", t); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseChecker.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseChecker.java index e6cd3738f..aef19e6f5 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseChecker.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseChecker.java @@ -15,12 +15,21 @@ import dev.responsive.kafka.internal.license.exception.LicenseUseViolationException; import dev.responsive.kafka.internal.license.model.LicenseInfo; import dev.responsive.kafka.internal.license.model.TimedTrialV1; +import dev.responsive.kafka.internal.license.model.UsageBasedV1; import java.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LicenseChecker { + + private static final Logger LOG = LoggerFactory.getLogger(LicenseChecker.class); + public void checkLicense(final LicenseInfo licenseInfo) { if (licenseInfo instanceof TimedTrialV1) { verifyTimedTrialV1((TimedTrialV1) licenseInfo); + LOG.info("Checked and confirmed valid Time Trial license"); + } else if (licenseInfo instanceof UsageBasedV1) { + LOG.info("Checked and confirmed valid Usage Based license"); } else { throw new IllegalArgumentException( "unsupported license type: " + licenseInfo.getClass().getName()); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseUtils.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseUtils.java new file mode 100644 index 000000000..044ae9ac2 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/LicenseUtils.java @@ -0,0 +1,100 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.license; + +import com.fasterxml.jackson.databind.ObjectMapper; +import dev.responsive.kafka.api.ResponsiveKafkaStreams; +import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.internal.license.model.CloudLicenseV1; +import dev.responsive.kafka.internal.license.model.LicenseDocument; +import dev.responsive.kafka.internal.license.model.LicenseInfo; +import dev.responsive.kafka.internal.license.model.SigningKeys; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Base64; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; + +public final class LicenseUtils { + + private static final String SIGNING_KEYS_PATH = "/responsive-license-keys/license-keys.json"; + + private LicenseUtils() { + } + + public static LicenseInfo loadLicense(final ResponsiveConfig config) { + if (!config.getString(ResponsiveConfig.PLATFORM_API_KEY_CONFIG).isEmpty()) { + // for now, we don't do any additional validation for users that use + // Responsive Cloud via the platform key pair since that will be validated + // when they make any request to the controller + return new CloudLicenseV1( + CloudLicenseV1.TYPE_NAME, + config.getString(ResponsiveConfig.PLATFORM_API_KEY_CONFIG) + ); + } + + final var licenseDoc = loadLicenseDocument(config); + final var licenseInfo = authenticateLicense(licenseDoc); + final LicenseChecker checker = new LicenseChecker(); + checker.checkLicense(licenseInfo); + return licenseInfo; + } + + private static LicenseDocument loadLicenseDocument(final ResponsiveConfig configs) { + final Password licensePass = configs.getPassword(ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG); + final String license = licensePass == null ? "" : licensePass.value(); + final String licenseFile = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG); + if (license.isEmpty() == licenseFile.isEmpty()) { + throw new ConfigException(String.format( + "Must set exactly one of %s or %s", + ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG, + ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG + )); + } + final String licenseB64; + if (!license.isEmpty()) { + licenseB64 = license; + } else { + try { + licenseB64 = Files.readString(new File(licenseFile).toPath()); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + final ObjectMapper mapper = new ObjectMapper(); + try { + return mapper.readValue(Base64.getDecoder().decode(licenseB64), LicenseDocument.class); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + private static LicenseInfo authenticateLicense(final LicenseDocument document) { + final SigningKeys signingKeys = loadSigningKeys(); + final LicenseAuthenticator licenseAuthenticator = new LicenseAuthenticator(signingKeys); + return licenseAuthenticator.authenticate(document); + } + + private static SigningKeys loadSigningKeys() { + try { + return new ObjectMapper().readValue( + ResponsiveKafkaStreams.class.getResource(SIGNING_KEYS_PATH), + SigningKeys.class + ); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/CloudLicenseV1.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/CloudLicenseV1.java new file mode 100644 index 000000000..e58b39d5e --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/CloudLicenseV1.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.license.model; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class CloudLicenseV1 extends LicenseInfo { + + public static final String TYPE_NAME = "cloud_license_v1"; + + private final String key; + + @JsonCreator + public CloudLicenseV1( + @JsonProperty("type") final String type, + @JsonProperty("key") final String key + ) { + super(type); + this.key = key; + } + + @JsonProperty("key") + public String key() { + return key; + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseInfo.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseInfo.java index 03ce6d194..98ac8fc2b 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseInfo.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/LicenseInfo.java @@ -23,7 +23,9 @@ visible = true ) @JsonSubTypes({ - @JsonSubTypes.Type(value = TimedTrialV1.class, name = "timed_trial_v1") + @JsonSubTypes.Type(value = TimedTrialV1.class, name = TimedTrialV1.TYPE_NAME), + @JsonSubTypes.Type(value = CloudLicenseV1.class, name = CloudLicenseV1.TYPE_NAME), + @JsonSubTypes.Type(value = UsageBasedV1.class, name = UsageBasedV1.TYPE_NAME) }) public abstract class LicenseInfo { private final String type; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/TimedTrialV1.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/TimedTrialV1.java index f320be0d1..f5dd0b7cb 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/TimedTrialV1.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/TimedTrialV1.java @@ -17,6 +17,9 @@ import java.util.Objects; public class TimedTrialV1 extends LicenseInfo { + + public static final String TYPE_NAME = "timed_trial_v1"; + private final String email; private final long issuedAt; private final long expiresAt; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/UsageBasedV1.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/UsageBasedV1.java new file mode 100644 index 000000000..2ba7a1d3e --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/model/UsageBasedV1.java @@ -0,0 +1,43 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.license.model; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class UsageBasedV1 extends LicenseInfo { + + public static final String TYPE_NAME = "usage_based_v1"; + + private final String email; + private final String key; + + @JsonCreator + public UsageBasedV1( + @JsonProperty("type") final String type, + @JsonProperty("email") final String email, + @JsonProperty("key") final String key + ) { + super(type); + this.email = email; + this.key = key; + } + + public String email() { + return email; + } + + public String key() { + return key; + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/server/model/LicenseServerRequest.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/server/model/LicenseServerRequest.java new file mode 100644 index 000000000..631fa2cce --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/server/model/LicenseServerRequest.java @@ -0,0 +1,52 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.license.server.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.EXISTING_PROPERTY, + property = "type", + visible = true +) +@JsonSubTypes({ + @JsonSubTypes.Type( + value = OriginEventsReportRequestV1.class, + name = OriginEventsReportRequestV1.TYPE_NAME) +}) +public class LicenseServerRequest { + + private final String type; + private final long timestamp; + + public LicenseServerRequest( + @JsonProperty("type") final String type, + @JsonProperty("timestamp") final long timestamp + ) { + this.type = type; + this.timestamp = timestamp; + } + + @JsonProperty("type") + public String type() { + return type; + } + + @JsonProperty("timestamp") + public long timestamp() { + return timestamp; + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/license/server/model/OriginEventsReportRequestV1.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/server/model/OriginEventsReportRequestV1.java new file mode 100644 index 000000000..8914f8f27 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/license/server/model/OriginEventsReportRequestV1.java @@ -0,0 +1,137 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.license.server.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Request that reports the number of origin events processed + * by a given (license, thread_id) since the last reported request + */ +public class OriginEventsReportRequestV1 extends LicenseServerRequest { + + public static final String TYPE_NAME = "origin_events_report_v1"; + + // transaction id is used to ignore duplicate events if we + // retry the billing request (UUID) + private final String transactionId; + + // the number of origin events since the last reported usage event + private final long eventCount; + + // the customer's environment + private final String env; + + // the application that reported the usage + private final String applicationId; + + // the thread that reported the usage + private final String threadId; + + public OriginEventsReportRequestV1( + @JsonProperty("type") final String type, + @JsonProperty("timestamp") final long timestamp, + @JsonProperty("transactionId") final String transactionId, + @JsonProperty("eventCount") final long eventCount, + @JsonProperty("env") final String env, + @JsonProperty("applicationId") final String applicationId, + @JsonProperty("threadId") final String threadId + ) { + super(type, timestamp); + this.transactionId = transactionId; + this.eventCount = eventCount; + this.env = env; + this.applicationId = applicationId; + this.threadId = threadId; + } + + @JsonProperty("transactionId") + public String transactionId() { + return transactionId; + } + + @JsonProperty("eventCount") + public long eventCount() { + return eventCount; + } + + @JsonProperty("env") + public String env() { + return env; + } + + @JsonProperty("applicationId") + public String applicationId() { + return applicationId; + } + + @JsonProperty("threadId") + public String threadId() { + return threadId; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private long timestamp; + private String transactionId; + private long eventCount; + private String env; + private String applicationId; + private String threadId; + + public Builder setTimestamp(final long timestamp) { + this.timestamp = timestamp; + return this; + } + + public Builder setTransactionId(final String transactionId) { + this.transactionId = transactionId; + return this; + } + + public Builder setEventCount(final long eventCount) { + this.eventCount = eventCount; + return this; + } + + public Builder setEnv(final String env) { + this.env = env; + return this; + } + + public Builder setApplicationId(final String applicationId) { + this.applicationId = applicationId; + return this; + } + + public Builder setThreadId(final String threadId) { + this.threadId = threadId; + return this; + } + + public OriginEventsReportRequestV1 build() { + return new OriginEventsReportRequestV1( + TYPE_NAME, + timestamp, + transactionId, + eventCount, + env, + applicationId, + threadId + ); + } + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java b/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java index cd2fc770e..812ab45c6 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java @@ -142,7 +142,7 @@ public void setUp() { properties.put( RESPONSIVE_LICENSE_CONFIG, - LicenseUtils.getLicense() + LicenseUtils.getTrialLicense() ); } @@ -238,7 +238,7 @@ public void shouldThrowWhenUsingResponsiveDslStoreSuppliersInNonResponsiveStorag @Test public void shouldAcceptLicenseInLicenseFile() { // given: - final File licenseFile = writeLicenseFile(LicenseUtils.getLicense()); + final File licenseFile = writeLicenseFile(LicenseUtils.getTrialLicense()); properties.put(RESPONSIVE_LICENSE_CONFIG, ""); properties.put(ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG, licenseFile.getAbsolutePath()); properties.put(STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.NONE.name()); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/OriginEventIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/OriginEventIntegrationTest.java new file mode 100644 index 000000000..0575247bd --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/OriginEventIntegrationTest.java @@ -0,0 +1,362 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.integration; + +import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_LICENSE_SERVER_CONFIG; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.getDefaultMutablePropertiesWithStringSerdes; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeTimestampedRecords; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.startAppAndAwaitRunning; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import dev.responsive.kafka.api.ResponsiveKafkaStreams; +import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.api.config.StorageBackend; +import dev.responsive.kafka.testutils.IntegrationTestUtils; +import dev.responsive.kafka.testutils.KeyValueTimestamp; +import dev.responsive.kafka.testutils.LicenseUtils; +import dev.responsive.kafka.testutils.ResponsiveConfigParam; +import dev.responsive.kafka.testutils.ResponsiveExtension; +import dev.responsive.kafka.testutils.TestLicenseServer; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Repartitioned; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class OriginEventIntegrationTest { + + @RegisterExtension + static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.MONGO_DB); + + private static final String INPUT_TOPIC = "input"; + private static final String OUTPUT_TOPIC = "output"; + + private final Map responsiveProps = new HashMap<>(); + + private String name; + private TestLicenseServer licenseServer; + + @BeforeEach + public void before( + final TestInfo info, + final Admin admin, + @ResponsiveConfigParam final Map responsiveProps, + final TestLicenseServer licenseServer + ) throws ExecutionException, InterruptedException { + name = IntegrationTestUtils.getCassandraValidName(info); + this.licenseServer = licenseServer; + this.responsiveProps.putAll(responsiveProps); + + // report every commit, so we don't potentially miss origin events + this.responsiveProps.put(ResponsiveConfig.ORIGIN_EVENT_REPORT_INTERVAL_MS_CONFIG, 0); + + final var result = admin.createTopics( + List.of( + new NewTopic(inputTopic(), Optional.of(2), Optional.empty()), + new NewTopic(inputTopicTable(), Optional.of(2), Optional.empty()), + new NewTopic(outputTopic(), Optional.of(1), Optional.empty()) + ) + ); + result.all().get(); + } + + private String inputTopic() { + return name + "." + INPUT_TOPIC; + } + + private String inputTopicTable() { + return name + ".table." + INPUT_TOPIC; + } + + private String outputTopic() { + return name + "." + OUTPUT_TOPIC; + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldCountOriginEventsSimpleTopology(final boolean eos) throws Exception { + // Given: + responsiveProps.put( + StreamsConfig.PROCESSING_GUARANTEE_CONFIG, + eos ? StreamsConfig.EXACTLY_ONCE_V2 : StreamsConfig.AT_LEAST_ONCE + ); + + final int numEvents = 100; + final CountDownLatch latch = new CountDownLatch(numEvents); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream(inputTopic()) + .peek((k, v) -> latch.countDown()) + .to(outputTopic()); + + final List> inputs = IntStream.range(0, numEvents) + .mapToObj(i -> new KeyValueTimestamp<>("key" + i, String.valueOf(i), i)) + .collect(Collectors.toList()); + + // When: + final var props = getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); + final KafkaProducer producer = new KafkaProducer<>(props); + try (final var kafkaStreams = new ResponsiveKafkaStreams(builder.build(), props)) { + startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); + pipeTimestampedRecords(producer, inputTopic(), inputs); + assertThat(latch.await(30, TimeUnit.SECONDS), is(true)); + } + + // Then: + final String appId = (String) props.get(StreamsConfig.APPLICATION_ID_CONFIG); + assertThat(licenseServer.eventCounts().get(appId), is((long) numEvents)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldNotDoubleCountRepartitionedEvents(final boolean eos) throws Exception { + // Given: + responsiveProps.put( + StreamsConfig.PROCESSING_GUARANTEE_CONFIG, + eos ? StreamsConfig.EXACTLY_ONCE_V2 : StreamsConfig.AT_LEAST_ONCE + ); + + final int numEvents = 100; + final CountDownLatch latch = new CountDownLatch(numEvents); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream(inputTopic()) + .repartition(Repartitioned.numberOfPartitions(4)) + .peek((k, v) -> latch.countDown()) + .to(outputTopic()); + + final List> inputs = IntStream.range(0, numEvents) + .mapToObj(i -> new KeyValueTimestamp<>("key" + i, String.valueOf(i), i)) + .collect(Collectors.toList()); + + // When: + final var props = getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); + final KafkaProducer producer = new KafkaProducer<>(props); + try (final var kafkaStreams = new ResponsiveKafkaStreams(builder.build(), props)) { + startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); + pipeTimestampedRecords(producer, inputTopic(), inputs); + assertThat(latch.await(30, TimeUnit.SECONDS), is(true)); + } + + // Then: + final String appId = (String) props.get(StreamsConfig.APPLICATION_ID_CONFIG); + assertThat(licenseServer.eventCounts().get(appId), is((long) numEvents)); + } + + @Test + public void shouldNotDoubleCountAggregatedEvents() throws Exception { + // Given: + final int numEvents = 100; + final CountDownLatch latch = new CountDownLatch(20); // twenty keys with 5 count each + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream(inputTopic()) + .groupBy((k, v) -> v) + .count() + .toStream() + // we want a sub-topology downstream of agg + .repartition(Repartitioned.numberOfPartitions(4)) + .peek((k, v) -> { + if (v == 5) { + latch.countDown(); + } + }) + .to(outputTopic()); + + final List> inputs = IntStream.range(0, numEvents) + .mapToObj(i -> new KeyValueTimestamp<>("key" + i, String.valueOf(i % 20), i)) + .collect(Collectors.toList()); + + // When: + final var props = getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); + final KafkaProducer producer = new KafkaProducer<>(props); + try (final var kafkaStreams = new ResponsiveKafkaStreams(builder.build(), props)) { + startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); + pipeTimestampedRecords(producer, inputTopic(), inputs); + assertThat(latch.await(30, TimeUnit.SECONDS), is(true)); + } + + // Then: + final String appId = (String) props.get(StreamsConfig.APPLICATION_ID_CONFIG); + assertThat(licenseServer.eventCounts().get(appId), is((long) numEvents)); + } + + @Test + public void shouldNotDoubleCountWindowedAggregatedEvents() throws Exception { + // Given: + final int numEvents = 100; + final CountDownLatch latch = new CountDownLatch(20); // 2 keys * 10 buckets = 20 + final StreamsBuilder builder = new StreamsBuilder(); + final KStream stream = builder.stream(inputTopic()); + final KStream, String> windowedStream = stream.groupByKey() + .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10))) + .count() + .mapValues(String::valueOf) + .toStream(); + + // we want a sub-topology downstream of agg + windowedStream + .repartition(Repartitioned.with( + WindowedSerdes.timeWindowedSerdeFrom(String.class, 10), + Serdes.String() + )) + .peek((k, v) -> { + if ("5".equalsIgnoreCase(v)) { + latch.countDown(); + } + }) + .to(outputTopic()); + + final List> inputs = IntStream.range(0, numEvents) + .mapToObj(i -> new KeyValueTimestamp<>("key" + i % 2, String.valueOf(i), i)) + .collect(Collectors.toList()); + + // When: + final var props = getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); + final KafkaProducer producer = new KafkaProducer<>(props); + try (final var kafkaStreams = new ResponsiveKafkaStreams(builder.build(), props)) { + startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); + pipeTimestampedRecords(producer, inputTopic(), inputs); + assertThat(latch.await(30, TimeUnit.SECONDS), is(true)); + } + + // Then: + final String appId = (String) props.get(StreamsConfig.APPLICATION_ID_CONFIG); + assertThat(licenseServer.eventCounts().get(appId), is((long) numEvents)); + } + + @Test + public void shouldNotDoubleCountJoinedEvents() throws Exception { + // Given: + final int eventsPerTopic = 100; + final CountDownLatch latch = new CountDownLatch(eventsPerTopic); + final StreamsBuilder builder = new StreamsBuilder(); + final KTable table = + builder.table(inputTopicTable(), Materialized.as("table1")); + final KStream stream = builder.stream(inputTopic()); + stream + // use left join to avoid races populating the table + .leftJoin(table, (v1, v2) -> v1 + (v2 == null ? "-none" : "-joined")) + // we want a sub-topology downstream of join + .repartition(Repartitioned.numberOfPartitions(4)) + .peek((k, v) -> latch.countDown()) + .to(outputTopic()); + + final List> inputsLeft = IntStream.range(0, eventsPerTopic) + .mapToObj(i -> new KeyValueTimestamp<>("key" + i, String.valueOf(i), eventsPerTopic + i)) + .collect(Collectors.toList()); + + final List> inputsRight = IntStream.range(0, eventsPerTopic) + .mapToObj(i -> new KeyValueTimestamp<>("key" + i, "val", i)) + .collect(Collectors.toList()); + + // When: + final var props = getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); + final KafkaProducer producer = new KafkaProducer<>(props); + try (final var kafkaStreams = new ResponsiveKafkaStreams(builder.build(), props)) { + startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); + pipeTimestampedRecords(producer, inputTopicTable(), inputsRight); + pipeTimestampedRecords(producer, inputTopic(), inputsLeft); + assertThat(latch.await(30, TimeUnit.SECONDS), is(true)); + } + + // Then: + final String appId = (String) props.get(StreamsConfig.APPLICATION_ID_CONFIG); + assertThat(licenseServer.eventCounts().get(appId), is(2L * eventsPerTopic)); + } + + @Test + public void shouldNotReportIfUsingTrialLicenseType() throws Exception { + // Given: + final int numEvents = 100; + final CountDownLatch latch = new CountDownLatch(numEvents); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream(inputTopic()) + .peek((k, v) -> latch.countDown()) + .to(outputTopic()); + + final List> inputs = IntStream.range(0, numEvents) + .mapToObj(i -> new KeyValueTimestamp<>("key" + i, String.valueOf(i), i)) + .collect(Collectors.toList()); + + // Then (No Error): + responsiveProps.put(RESPONSIVE_LICENSE_CONFIG, LicenseUtils.getTrialLicense()); + responsiveProps.put(RESPONSIVE_LICENSE_SERVER_CONFIG, "CRASH_IF_LOAD_SERVER"); + + final var props = getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); + final KafkaProducer producer = new KafkaProducer<>(props); + try (final var kafkaStreams = new ResponsiveKafkaStreams(builder.build(), props)) { + startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); + pipeTimestampedRecords(producer, inputTopic(), inputs); + assertThat(latch.await(30, TimeUnit.SECONDS), is(true)); + } + } + + @Test + public void shouldNotReportIfUsingCloudLicense() throws Exception { + // Given: + final int numEvents = 100; + final CountDownLatch latch = new CountDownLatch(numEvents); + final StreamsBuilder builder = new StreamsBuilder(); + builder + .stream(inputTopic()) + .peek((k, v) -> latch.countDown()) + .to(outputTopic()); + + final List> inputs = IntStream.range(0, numEvents) + .mapToObj(i -> new KeyValueTimestamp<>("key" + i, String.valueOf(i), i)) + .collect(Collectors.toList()); + + // Then (No Error): + responsiveProps.remove(RESPONSIVE_LICENSE_CONFIG); + responsiveProps.put(ResponsiveConfig.PLATFORM_API_KEY_CONFIG, "somevalue"); + responsiveProps.put(ResponsiveConfig.PLATFORM_API_SECRET_CONFIG, "somevalue"); + responsiveProps.put(RESPONSIVE_LICENSE_SERVER_CONFIG, "CRASH_IF_LOAD_SERVER"); + + final var props = getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); + final KafkaProducer producer = new KafkaProducer<>(props); + try (final var kafkaStreams = new ResponsiveKafkaStreams(builder.build(), props)) { + startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); + pipeTimestampedRecords(producer, inputTopic(), inputs); + assertThat(latch.await(30, TimeUnit.SECONDS), is(true)); + } + } + +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OffsetRecorderTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OffsetRecorderTest.java index 92f0fba4c..7c459f405 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OffsetRecorderTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OffsetRecorderTest.java @@ -61,7 +61,7 @@ TOPIC_PARTITION2, new OffsetAndMetadata(456L) ); // when: - eosRecorder.getProducerListener().onCommit(); + eosRecorder.getProducerListener().onProducerCommit(); // then: assertThat(getCommittedOffsetsSentToCallback(), is(Map.of( @@ -82,7 +82,7 @@ public void shouldRecordProducedOffsets() { ); // when: - eosRecorder.getProducerListener().onCommit(); + eosRecorder.getProducerListener().onProducerCommit(); // then: assertThat(getWrittenOffsetsSentToCallback(), is(Map.of( @@ -111,7 +111,7 @@ TOPIC_PARTITION2, new OffsetAndMetadata(456L) // when: eosRecorder.getProducerListener().onAbort(); - eosRecorder.getProducerListener().onCommit(); + eosRecorder.getProducerListener().onProducerCommit(); // then: assertThat(getWrittenOffsetsSentToCallback().entrySet(), is(empty())); @@ -121,7 +121,7 @@ TOPIC_PARTITION2, new OffsetAndMetadata(456L) @Test public void shouldRecordOffsetsCommittedByConsumer() { // when: - alosRecorder.getConsumerListener().onCommit( + alosRecorder.getConsumerListener().onConsumerCommit( Map.of( TOPIC_PARTITION1, new OffsetAndMetadata(123L), TOPIC_PARTITION2, new OffsetAndMetadata(456L) @@ -140,7 +140,7 @@ TOPIC_PARTITION2, new OffsetAndMetadata(456L) public void shouldThrowIfCommitFromConsumerOnEos() { assertThrows( IllegalStateException.class, - () -> eosRecorder.getConsumerListener().onCommit(Map.of()) + () -> eosRecorder.getConsumerListener().onConsumerCommit(Map.of()) ); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OffsetTrackerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OffsetTrackerTest.java new file mode 100644 index 000000000..a04dbbbdb --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OffsetTrackerTest.java @@ -0,0 +1,104 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.clients; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; + +class OffsetTrackerTest { + + @Test + public void shouldMarkAndCountSingleOffset() { + // Given + final OffsetTracker tracker = new OffsetTracker(1000L); + tracker.mark(1005L); + + // When + int count = tracker.countAndShift(1010L); + + // Then + assertThat("Expected one marked offset between 1000 and 1010", count, is(equalTo(1))); + } + + @Test + public void shouldMarkAndCountMultipleOffsetsAndIgnoreHigherOffsets() { + // Given + final OffsetTracker tracker = new OffsetTracker(1000L); + tracker.mark(1001L); + tracker.mark(1003L); + tracker.mark(1005L); + tracker.mark(1009L); + + // When + int count = tracker.countAndShift(1006L); + + // Then + assertThat("Expected three marked offsets before 1006", count, is(equalTo(3))); + } + + @Test + public void shouldCountNoMarkedOffsets() { + // Given: no marks have been set + final OffsetTracker tracker = new OffsetTracker(1000L); + + // When + int count = tracker.countAndShift(1010L); + + // Then + assertThat("Expected no marks when none have been set", count, is(equalTo(0))); + } + + @Test + public void shouldThrowExceptionOnMarkingOldOffset() { + // Given: + final OffsetTracker tracker = new OffsetTracker(1000L); + + // When/Then: attempting to mark offset 999 should throw an IllegalArgumentException + Exception ex = assertThrows(IllegalArgumentException.class, () -> tracker.mark(999L)); + assertThat(ex.getMessage(), containsString("cannot be less than baseOffset")); + } + + @Test + public void shouldThrowWhenCommittingOldOffset() { + // Given: + final OffsetTracker tracker = new OffsetTracker(1000L); + + // When/Then: committing at an offset below the base should throw an exception + Exception ex = assertThrows(IllegalArgumentException.class, () -> tracker.countAndShift(999L)); + assertThat(ex.getMessage(), containsString("cannot be less than baseOffset")); + } + + @Test + public void shouldShiftAndMarkAppropriately() { + // Given: + final OffsetTracker tracker = new OffsetTracker(1000L); + tracker.mark(1001L); + tracker.mark(1004L); + + // When: + tracker.countAndShift(1005L); + tracker.mark(1006L); // becomes index 1 relative to new base 1005 + tracker.mark(1007L); // becomes index 2 relative to base 1005 + tracker.mark(1011L); // out of range for next commit + int count = tracker.countAndShift(1010L); + + // Then: + assertThat("Expected two marks before 1010", count, is(equalTo(2))); + } + +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OriginEventRecorderTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OriginEventRecorderTest.java new file mode 100644 index 000000000..aee4cbc73 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/OriginEventRecorderTest.java @@ -0,0 +1,226 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.internal.clients; + +import static dev.responsive.kafka.internal.clients.OriginEventRecorderImpl.ORIGIN_EVENT_HEADER_KEY; +import static dev.responsive.kafka.internal.clients.OriginEventRecorderImpl.ORIGIN_EVENT_MARK; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasSize; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class OriginEventRecorderTest { + + private static final TopicPartition TP = new TopicPartition("topic", 0); + + private OriginEventRecorderImpl recorder; + private TestReporter reporter; + + @BeforeEach + public void setup() { + reporter = new TestReporter(); + recorder = new OriginEventRecorderImpl("test-thread", reporter, false); + } + + @Test + public void shouldTrackOffsetAndReportOnConsumerCommit() { + // Given: + final long offset = 1000L; + final ConsumerRecord record = record(offset, false); + final ConsumerRecords records = records(record); + + // When: + recorder.onPoll(records); + recorder.onConsumerCommit(Map.of(TP, new OffsetAndMetadata(offset + 1))); + + // Then: a report is generated with a count of 1 for the partition. + List calls = reporter.getCalls(); + assertThat("A report should have been triggered", calls, hasSize(1)); + + ReportCall call = calls.get(0); + assertThat("Report thread id should be 'test-thread'", call.threadId, is("test-thread")); + assertThat("Report should contain the TopicPartition", call.report.keySet(), hasItem(TP)); + assertThat("Count for the partition should be 1", call.report.get(TP), is(1)); + } + + @Test + public void shouldNotTrackOffsetWhenOriginHeaderPresent() { + // Given: + long offset = 1000L; + ConsumerRecords records = records( + record(offset, true), + record(offset + 1, false) + ); + + // When: + recorder.onPoll(records); + Map commitOffsets = new HashMap<>(); + commitOffsets.put(TP, new OffsetAndMetadata(offset + 2)); + recorder.onConsumerCommit(commitOffsets); + + // Then: + List calls = reporter.getCalls(); + assertThat("A report should have been triggered", calls, hasSize(1)); + + ReportCall call = calls.get(0); + assertThat("Report thread id should be 'test-thread'", call.threadId, is("test-thread")); + assertThat("Report should contain the TopicPartition", call.report.keySet(), hasItem(TP)); + assertThat("Count for the partition should only count 1", call.report.get(TP), is(1)); + } + + @Test + public void shouldAddOriginHeaderOnSend() { + // Given: + ProducerRecord record = new ProducerRecord<>( + TP.topic(), TP.partition(), null, "key", "value" + ); + + // When: + ProducerRecord returnedRecord = recorder.onSend(record); + + // Then: + assertThat("Record should have origin header after onSend", + returnedRecord.headers().lastHeader(ORIGIN_EVENT_HEADER_KEY).value(), + is(ORIGIN_EVENT_MARK) + ); + } + + @Test + public void shouldReportOnProducerCommitWithEos() { + // Given: + recorder = new OriginEventRecorderImpl("test-thread", reporter, true); + + long offset = 1000L; + final ConsumerRecords records = records(record(offset, false)); + recorder.onPoll(records); + + // When: + recorder.onSendOffsetsToTransaction(Map.of(TP, new OffsetAndMetadata(offset + 1)), ""); + recorder.onProducerCommit(); + + // Then: a report is triggered with the correct count. + List calls = reporter.getCalls(); + assertThat("A report should be triggered on producer commit", calls, hasSize(1)); + ReportCall call = calls.get(0); + assertThat("Reported count for the partition should be 1", call.report.get(TP), is(1)); + } + + @Test + public void shouldOnlyIncludeOffsetsUpToCommitOffset() { + // Given: + long baseOffset = 1000L; + recorder.onPoll(records( + record(baseOffset, false), + record(baseOffset + 1, false) + )); + + // When: + recorder.onConsumerCommit(Map.of(TP, new OffsetAndMetadata(baseOffset + 1))); + + // Then: only the mark for offset 1000 (i.e. below commit offset 5001) is counted. + final List calls = reporter.getCalls(); + assertThat("A report should be triggered", calls, hasSize(1)); + assertThat( + "Reported count for the partition should be 1 (only offset 1000 counted)", + calls.get(0).report.get(TP), + is(1) + ); + } + + @Test + public void shouldOnlyReportCommittedTopicPartitions() { + // Given: + final TopicPartition tp1 = new TopicPartition("t1", 0); + final TopicPartition tp2 = new TopicPartition("t2", 0); + long offset = 1000L; + + final var record1 = new ConsumerRecord<>(tp1.topic(), tp1.partition(), offset, "key", "value"); + final var record2 = new ConsumerRecord<>(tp2.topic(), tp2.partition(), offset, "key", "value"); + + // When: + recorder.onPoll(records(record1, record2)); + recorder.onConsumerCommit(Map.of(tp1, new OffsetAndMetadata(offset + 1))); + + // Then: + List calls = reporter.getCalls(); + assertThat("A report should have been triggered", calls, hasSize(1)); + ReportCall call = calls.get(0); + assertThat("Report should only contain the committed TopicPartition", + call.report.keySet(), contains(tp1)); + assertThat("Reported count for tp1 should be 1", call.report.get(tp1), is(1)); + } + + + private ConsumerRecord record( + final long offset, + final boolean withOriginHeader + ) { + final ConsumerRecord record = new ConsumerRecord<>( + TP.topic(), + TP.partition(), + offset, + "key", + "value" + ); + + if (withOriginHeader) { + record.headers().add(ORIGIN_EVENT_HEADER_KEY, ORIGIN_EVENT_MARK); + } + + return record; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private ConsumerRecords records(final ConsumerRecord... records) { + return new ConsumerRecords<>(Map.of(TP, Arrays.asList(records))); + } + + private static class ReportCall { + final Map report; + final String threadId; + + ReportCall(Map report, String threadId) { + this.report = report; + this.threadId = threadId; + } + } + + private static class TestReporter + implements java.util.function.BiConsumer, String> { + + private final List calls = new ArrayList<>(); + + @Override + public void accept(Map report, String threadId) { + calls.add(new ReportCall(report, threadId)); + } + + public List getCalls() { + return calls; + } + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveConsumerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveConsumerTest.java index 63361cb40..48c1b65aa 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveConsumerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveConsumerTest.java @@ -171,8 +171,8 @@ public void shouldNotifyOnClose() { consumer.close(); // then: - verify(listener1).onClose(); - verify(listener2).onClose(); + verify(listener1).onConsumerClose(); + verify(listener2).onConsumerClose(); } @Test @@ -184,8 +184,8 @@ public void shouldNotifyOnCommitSync() { consumer.commitSync(commits); // then: - verify(listener1).onCommit(commits); - verify(listener2).onCommit(commits); + verify(listener1).onConsumerCommit(commits); + verify(listener2).onConsumerCommit(commits); } @Test @@ -197,8 +197,8 @@ public void shouldNotifyOnCommitSyncWithTimeout() { consumer.commitSync(commits, Duration.ofSeconds(30)); // then: - verify(listener1).onCommit(commits); - verify(listener2).onCommit(commits); + verify(listener1).onConsumerCommit(commits); + verify(listener2).onConsumerCommit(commits); } @Test @@ -214,14 +214,14 @@ public void shouldThrowOnCommitsWithoutOffsetsAndAsyncCommits() { @Test public void shouldIgnoreErrorsOnCloseCallback() { // given: - doThrow(new RuntimeException("oops")).when(listener1).onClose(); + doThrow(new RuntimeException("oops")).when(listener1).onConsumerClose(); // when: consumer.close(); // then: - verify(listener1).onClose(); - verify(listener2).onClose(); + verify(listener1).onConsumerClose(); + verify(listener2).onConsumerClose(); } @SuppressWarnings("unchecked") diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java index e354630f2..410b10a1a 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java @@ -110,6 +110,8 @@ class ResponsiveKafkaClientSupplierTest { private ResponsiveKafkaClientSupplier supplier; private final ResponsiveStoreRegistry storeRegistry = new ResponsiveStoreRegistry(); + private final OriginEventRecorder originEventRecorder = + new OriginEventRecorderImpl("thread", (a, b) -> { }, false); @BeforeEach @SuppressWarnings("unchecked") @@ -127,6 +129,7 @@ public void setup() { lenient().when(factories.createMetricsPublishingCommitListener(any(), any(), any())) .thenReturn(commitMetricListener); lenient().when(factories.createOffsetRecorder(anyBoolean(), any())).thenReturn(offsetRecorder); + lenient().when(factories.createOriginEventRecorder(any())).thenReturn(originEventRecorder); supplier = supplier(CONFIGS, StorageBackend.MONGO_DB); } @@ -229,6 +232,17 @@ public void shouldAddEndOffsetsPollerListeners() { assertThat(consumerListenerCaptor.getValue(), Matchers.hasItem(consumerEndOffsetsPollListener)); } + @Test + public void shouldAddOriginEventListeners() { + // when: + supplier.getConsumer(CONSUMER_CONFIGS); + + // then: + verify(factories).createResponsiveConsumer( + any(), any(), consumerListenerCaptor.capture()); + assertThat(consumerListenerCaptor.getValue(), Matchers.hasItem(originEventRecorder)); + } + @Test public void shouldCloseMetricPublishingCommitListenerWhenNoRefs() { // given: @@ -238,10 +252,10 @@ public void shouldCloseMetricPublishingCommitListenerWhenNoRefs() { // then: verify(factories).createResponsiveConsumer( any(), any(), consumerListenerCaptor.capture()); - consumerListenerCaptor.getValue().forEach(ResponsiveConsumer.Listener::onClose); + consumerListenerCaptor.getValue().forEach(ResponsiveConsumer.Listener::onConsumerClose); verify(commitMetricListener, times(0)).close(); verify(factories).createResponsiveProducer(any(), any(), producerListenerCaptor.capture()); - producerListenerCaptor.getValue().forEach(ResponsiveProducer.Listener::onClose); + producerListenerCaptor.getValue().forEach(ResponsiveProducer.Listener::onProducerClose); verify(commitMetricListener).close(); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveProducerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveProducerTest.java index 650b05c30..a3350ee37 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveProducerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveProducerTest.java @@ -19,11 +19,11 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import dev.responsive.kafka.internal.clients.ResponsiveProducer; import dev.responsive.kafka.internal.clients.ResponsiveProducer.Listener; import java.util.List; import java.util.Map; @@ -62,6 +62,8 @@ public class ResponsiveProducerTest { @BeforeEach public void setup() { producer = new ResponsiveProducer<>("clientid", wrapped, List.of(listener1, listener2)); + lenient().when(listener1.onSend(any())).thenAnswer(iom -> iom.getArguments()[0]); + lenient().when(listener2.onSend(any())).thenAnswer(iom -> iom.getArguments()[0]); } @Test @@ -70,8 +72,8 @@ public void shouldNotifyOnCommit() { producer.commitTransaction(); // then: - verify(listener1).onCommit(); - verify(listener2).onCommit(); + verify(listener1).onProducerCommit(); + verify(listener2).onProducerCommit(); } @Test @@ -135,6 +137,20 @@ public void shouldNotifyOnSendFutureAndReturnRecordMetadata() verify(listener2).onSendCompleted(recordMetadata); } + @Test + public void shouldAllowRecordModificationOnSend() { + // Given: + final var rec1 = new ProducerRecord(PARTITION1.topic(), "val"); + final var rec2 = new ProducerRecord(PARTITION1.topic(), "val"); + when(listener1.onSend(any())).thenAnswer(iom -> rec2); + + // When: + producer.send(rec1, (rm, e) -> { }); + + // Then: + verify(wrapped).send(same(rec2), any()); + } + @Test public void shouldNotifyOnSendOffsetsToTransaction() { // when: @@ -159,7 +175,7 @@ PARTITION2, new OffsetAndMetadata(11L) public void shouldThrowExceptionFromCommitCallback() { // given: producer.sendOffsetsToTransaction(Map.of(PARTITION1, new OffsetAndMetadata(10)), "foo"); - doThrow(new RuntimeException("oops")).when(listener1).onCommit(); + doThrow(new RuntimeException("oops")).when(listener1).onProducerCommit(); // when/then: assertThrows(RuntimeException.class, () -> producer.commitTransaction()); @@ -171,20 +187,20 @@ public void shouldNotifyOnClose() { producer.close(); // then: - verify(listener1).onClose(); - verify(listener2).onClose(); + verify(listener1).onProducerClose(); + verify(listener2).onProducerClose(); } @Test public void shouldIgnoreExceptionFromCloseCallback() { // given: - doThrow(new RuntimeException("oops")).when(listener1).onClose(); + doThrow(new RuntimeException("oops")).when(listener1).onProducerClose(); // when: producer.close(); // then: - verify(listener1).onClose(); - verify(listener2).onClose(); + verify(listener1).onProducerClose(); + verify(listener2).onProducerClose(); } } \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java index e9ea55611..6ec8789d8 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java @@ -15,8 +15,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import dev.responsive.kafka.internal.clients.OffsetRecorder; -import dev.responsive.kafka.internal.clients.StoreCommitListener; import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration; import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; import java.util.Map; @@ -91,7 +89,7 @@ private void sendCommittedOffsets(final Map offsets) { .collect(Collectors.toMap(Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))), "group" ); - offsetRecorder.getProducerListener().onCommit(); + offsetRecorder.getProducerListener().onProducerCommit(); } private void sendWrittenOffsets(final Map offsets) { @@ -105,6 +103,6 @@ private void sendWrittenOffsets(final Map offsets) { 0 )); } - offsetRecorder.getProducerListener().onCommit(); + offsetRecorder.getProducerListener().onProducerCommit(); } } \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java index a133e3528..8c2a1e905 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseAuthenticatorTest.java @@ -29,7 +29,7 @@ class LicenseAuthenticatorTest { @Test public void shouldVerifyLicense() { // given: - final LicenseDocument license = loadLicense("test-license.json"); + final LicenseDocument license = loadLicense("test-trial-license.json"); // when/then (no throw): verifier.authenticate(license); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseCheckerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseCheckerTest.java index 7595542a9..cd4606280 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseCheckerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/license/LicenseCheckerTest.java @@ -17,6 +17,7 @@ import dev.responsive.kafka.internal.license.exception.LicenseUseViolationException; import dev.responsive.kafka.internal.license.model.LicenseInfo; import dev.responsive.kafka.internal.license.model.TimedTrialV1; +import dev.responsive.kafka.internal.license.model.UsageBasedV1; import java.time.Duration; import java.time.Instant; import org.junit.jupiter.api.Test; @@ -51,4 +52,17 @@ public void shouldAcceptValidTrialV1License() { // when/then (no throw): checker.checkLicense(info); } + + @Test + public void shouldAcceptValidUsageBasedLicense() { + // given: + final LicenseInfo info = new UsageBasedV1( + UsageBasedV1.TYPE_NAME, + "foo@bar.com", + "key" + ); + + // when/then (no throw): + checker.checkLicense(info); + } } \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/MetricPublishingCommitListenerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/MetricPublishingCommitListenerTest.java index ecc13a8d8..cd03e3b1f 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/MetricPublishingCommitListenerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/metrics/MetricPublishingCommitListenerTest.java @@ -77,7 +77,7 @@ public void shouldReportCommittedOffsets() { ); // when: - recorderProducerListener.onCommit(); + recorderProducerListener.onProducerCommit(); // then: verify(metrics, times(2)).addMetric( @@ -117,7 +117,7 @@ public void shouldCleanupCommittedOffsetOnRevoke() { Map.of(PARTITION1, new OffsetAndMetadata(123L), PARTITION2, new OffsetAndMetadata(345L)), GROUP ); - recorderProducerListener.onCommit(); + recorderProducerListener.onProducerCommit(); // when: listener.onPartitionsRevoked(List.of(PARTITION1)); @@ -138,7 +138,7 @@ public void shouldAddCommittedOffsetMetricOnAssign() { ); // when: - recorderProducerListener.onCommit(); + recorderProducerListener.onProducerCommit(); // then: verify(metrics, times(2)) @@ -158,7 +158,7 @@ public void shouldCleanupMetricsOnClose() { Map.of(PARTITION1, new OffsetAndMetadata(123L), PARTITION2, new OffsetAndMetadata(345L)), GROUP ); - recorderProducerListener.onCommit(); + recorderProducerListener.onProducerCommit(); // when: listener.close(); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java index 24dc42935..55add4140 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java @@ -165,6 +165,11 @@ public static long minutesToMillis(final long minutes) { } public static String getCassandraValidName(final TestInfo info) { + // parameterized tests use `[x]` to indicate the parameterization number + if (!info.getDisplayName().contains("[")) { + return info.getTestMethod().orElseThrow().getName().toLowerCase(Locale.ROOT); + } + // add displayName to name to account for parameterized tests return info.getTestMethod().orElseThrow().getName().toLowerCase(Locale.ROOT) + info.getDisplayName().substring("[X] ".length()).toLowerCase(Locale.ROOT) diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java index 5d4fe6299..0e2211098 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/LicenseUtils.java @@ -7,14 +7,20 @@ import java.util.Base64; public final class LicenseUtils { - private static final String DECODED_LICENSE_FILE - = "test-licenses/test-license.json"; + private static final String DECODED_TRIAL_LICENSE_FILE + = "test-licenses/test-trial-license.json"; + private static final String DECODED_USAGE_LICENSE_FILE + = "test-licenses/test-usage-license.json"; private LicenseUtils() { } - public static String getLicense() { - return getEncodedLicense(DECODED_LICENSE_FILE); + public static String getTrialLicense() { + return getEncodedLicense(DECODED_TRIAL_LICENSE_FILE); + } + + public static String getUsageLicense() { + return getEncodedLicense(DECODED_USAGE_LICENSE_FILE); } public static String getEncodedLicense(final String filename) { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java index 16743f99a..0a5113dfc 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java @@ -20,6 +20,7 @@ import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_CONNECTION_STRING_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ENV_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_LICENSE_SERVER_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ORG_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_HOSTNAME_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_PORT_CONFIG; @@ -65,6 +66,7 @@ public class ResponsiveExtension .withEnv("KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS", "60000") .withReuse(true); public static MongoDBContainer mongo = new MongoDBContainer(TestConstants.MONGODB); + public static TestLicenseServer licenseServer = new TestLicenseServer(); public static Admin admin; @@ -98,6 +100,7 @@ public static void startAll() { final var kafkaFuture = Startables.deepStart(kafka); final var storageFuture = Startables.deepStart(cassandra, mongo); try { + licenseServer.start(); kafkaFuture.get(); admin = Admin.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers())); storageFuture.get(); @@ -115,6 +118,7 @@ public static void stopAll() { mongo.stop(); kafka.stop(); admin.close(); + licenseServer.close(); } public ResponsiveExtension() { @@ -134,6 +138,7 @@ public boolean supportsParameter( || parameterContext.getParameter().getType().equals(MongoDBContainer.class) || parameterContext.getParameter().getType().equals(RS3Container.class) || parameterContext.getParameter().getType().equals(Admin.class) + || parameterContext.getParameter().getType().equals(TestLicenseServer.class) || isContainerConfig(parameterContext); } @@ -152,6 +157,8 @@ public Object resolveParameter( return rs3; } else if (parameterContext.getParameter().getType() == Admin.class) { return admin; + } else if (parameterContext.getParameter().getType() == TestLicenseServer.class) { + return licenseServer; } else if (isContainerConfig(parameterContext)) { final Map map = new HashMap<>(Map.of( RESPONSIVE_ORG_CONFIG, "responsive", @@ -159,7 +166,8 @@ public Object resolveParameter( BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), CASSANDRA_DESIRED_NUM_PARTITION_CONFIG, -1, CASSANDRA_CHECK_INTERVAL_MS, 100, - RESPONSIVE_LICENSE_CONFIG, LicenseUtils.getLicense() + RESPONSIVE_LICENSE_CONFIG, LicenseUtils.getUsageLicense(), + RESPONSIVE_LICENSE_SERVER_CONFIG, "http://localhost:" + licenseServer.port() )); if (backend == StorageBackend.CASSANDRA) { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestLicenseServer.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestLicenseServer.java new file mode 100644 index 000000000..626ec37cf --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestLicenseServer.java @@ -0,0 +1,85 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.testutils; + +import com.sun.net.httpserver.HttpServer; +import dev.responsive.kafka.internal.license.server.model.OriginEventsReportRequestV1; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; + +public class TestLicenseServer implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(TestLicenseServer.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final Map eventCounts = new ConcurrentHashMap<>(); + private int port; + private HttpServer server; + + public TestLicenseServer() { + } + + public synchronized void start() throws IOException { + LOG.info("Starting test license server..."); + // Create a server on a random available port (port 0 means OS-assigned) + server = HttpServer.create(new InetSocketAddress(0), 0); + server.createContext("/usage", exchange -> { + final var body = exchange.getRequestBody(); + final var req = MAPPER.readValue(body, OriginEventsReportRequestV1.class); + + eventCounts.compute( + req.applicationId(), + (k, v) -> v == null ? req.eventCount() : v + req.eventCount() + ); + + String response = "OK"; + exchange.sendResponseHeaders(200, response.getBytes().length); + try (OutputStream os = exchange.getResponseBody()) { + os.write(response.getBytes()); + } + }); + server.setExecutor(Executors.newSingleThreadExecutor()); + server.start(); + + // Capture the actual port assigned + port = server.getAddress().getPort(); + LOG.info("Started test license server on port {}", port); + } + + @Override + public synchronized void close() { + LOG.info("Stopping test license server."); + if (server != null) { + server.stop(0); + } + } + + public int port() { + if (server == null) { + throw new IllegalStateException("Cannot call port() before starting server"); + } + return port; + } + + public Map eventCounts() { + return eventCounts; + } +} diff --git a/kafka-client/src/test/resources/log4j.properties b/kafka-client/src/test/resources/log4j.properties index 4554d069d..00e87d443 100644 --- a/kafka-client/src/test/resources/log4j.properties +++ b/kafka-client/src/test/resources/log4j.properties @@ -17,5 +17,6 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.dev.responsive.kafka.api.async=DEBUG +#log4j.logger.dev.responsive.kafka.internal.clients.OriginEventRecorder=DEBUG log4j.logger.org.apache.kafka.streams.state.internals.AsyncKeyValueStoreBuilder=DEBUG -log4j.logger.org.apache.kafka.streams.state.internals.AsyncTimestampedKeyValueStoreBuilder=DEBUG \ No newline at end of file +log4j.logger.org.apache.kafka.streams.state.internals.AsyncTimestampedKeyValueStoreBuilder=DEBUG diff --git a/kafka-client/src/test/resources/test-licenses/test-license.json b/kafka-client/src/test/resources/test-licenses/test-trial-license.json similarity index 100% rename from kafka-client/src/test/resources/test-licenses/test-license.json rename to kafka-client/src/test/resources/test-licenses/test-trial-license.json diff --git a/kafka-client/src/test/resources/test-licenses/test-usage-license.json b/kafka-client/src/test/resources/test-licenses/test-usage-license.json new file mode 100644 index 000000000..929336016 --- /dev/null +++ b/kafka-client/src/test/resources/test-licenses/test-usage-license.json @@ -0,0 +1,8 @@ +{ + "info": "eyJ0eXBlIjoidXNhZ2VfYmFzZWRfdjEiLCAiZW1haWwiOiAidGVzdEByZXNwb25zaXZlLmRldiIsICJrZXkiOiAiTk9UVVNFRCJ9Cg==", + "signature": "luXWYcLxvRRTAym9JsLepK5eWdIyaCUlccb72IWNTMKLHG6HmQY4o0RteDC6k03KseuFav/w+2EAHGodnwXuFw78XRROA7OOEf6JEkQ3KNrpD3pwxy2wOQkxii7IuIqOWNNpQrljoDsO2Jjr061s62/UwXZ/eaU634Cs86yOUpVFJi9BewNf95kwjmkAT6nCfh39OVhd/5JVc1dwcbHgqDE8OKJWdv4gUOwsfkjtToWTezRm1Ckrh8310wmcZ6iDXi8OB2BTEz3BG6DofZKVIkqAbXF9T3inqDd4a1Twvc5N+8r1a/vIY1zZZkFt9qRRBtG0ld5aHbxBC/2MkxRZqkiw7qtfYDW+LMGW86pLFzGdjZCIDBp6R91Xo170UNEStNWZsZnzTdoYtK5yyDxlPiWX2OS8Q//mnee4qw0ID79rBjaTqCNoa7VKEbmL7jwnJZCUWMeM6REytTLtL7uPwHMXPoNg2GwYY62N8oztr7pltyof+byVrTrhx7UAZAuCDZpWSCH/cOsRzVZvYy8TX0QiW8gnpsnvYRWSlGZqJm6poGme8/4YgYrLJvHC8P9X3KKpK7JAyyfD7Iz/0S0h9FZfK1YA70Tybdmg1R+eWINqMKnf+mPnDtI/ZN5+EzeiXUIHRDYT1F1+h+CUcqn4pRQWRx2LxPT+SlpJcxVMvGY=", + "key": "test", + "algo": "RSASSA_PSS_SHA_256", + "messageType": "RAW", + "version": 1 +} \ No newline at end of file