Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import static dev.responsive.kafka.api.config.ResponsiveConfig.RS3_TLS_ENABLED_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE;
import static dev.responsive.kafka.internal.config.ResponsiveStreamsConfig.validateNoStorageStreamsConfig;
import static dev.responsive.kafka.internal.license.LicenseUtils.loadLicense;
import static dev.responsive.kafka.internal.metrics.ResponsiveMetrics.RESPONSIVE_METRICS_NAMESPACE;
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.METRICS_NUM_SAMPLES_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG;

import com.fasterxml.jackson.databind.ObjectMapper;
import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry;
import dev.responsive.kafka.api.async.internals.AsyncUtils;
import dev.responsive.kafka.api.config.ResponsiveConfig;
Expand All @@ -46,11 +46,6 @@
import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient;
import dev.responsive.kafka.internal.db.rs3.RS3TableFactory;
import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client;
import dev.responsive.kafka.internal.license.LicenseAuthenticator;
import dev.responsive.kafka.internal.license.LicenseChecker;
import dev.responsive.kafka.internal.license.model.LicenseDocument;
import dev.responsive.kafka.internal.license.model.LicenseInfo;
import dev.responsive.kafka.internal.license.model.SigningKeys;
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener;
Expand All @@ -61,11 +56,7 @@
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry;
import dev.responsive.kafka.internal.utils.SessionClients;
import dev.responsive.kafka.internal.utils.SessionUtil;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -97,8 +88,6 @@

public class ResponsiveKafkaStreams extends KafkaStreams {

private static final String SIGNING_KEYS_PATH = "/responsive-license-keys/license-keys.json";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this License stuff was moved to LicenseUtils


private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaStreams.class);

private final ResponsiveMetrics responsiveMetrics;
Expand Down Expand Up @@ -211,7 +200,7 @@ protected ResponsiveKafkaStreams(final Params params) {
params.time
);

validateLicense(params.responsiveConfig);
loadLicense(params.responsiveConfig);

this.responsiveMetrics = params.metrics;
this.sessionClients = params.sessionClients;
Expand Down Expand Up @@ -265,57 +254,6 @@ private static ResponsiveMetrics createMetrics(
), exportService);
}

private static void validateLicense(final ResponsiveConfig configs) {
if (!configs.getString(ResponsiveConfig.PLATFORM_API_KEY_CONFIG).isEmpty()) {
return;
}
final LicenseDocument licenseDocument = loadLicense(configs);
final SigningKeys signingKeys = loadSigningKeys();
final LicenseAuthenticator licenseAuthenticator = new LicenseAuthenticator(signingKeys);
final LicenseInfo licenseInfo = licenseAuthenticator.authenticate(licenseDocument);
final LicenseChecker checker = new LicenseChecker();
checker.checkLicense(licenseInfo);
}

private static SigningKeys loadSigningKeys() {
try {
return new ObjectMapper().readValue(
ResponsiveKafkaStreams.class.getResource(SIGNING_KEYS_PATH),
SigningKeys.class
);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

private static LicenseDocument loadLicense(final ResponsiveConfig configs) {
final String license = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG);
final String licenseFile = configs.getString(ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG);
if (license.isEmpty() == licenseFile.isEmpty()) {
throw new ConfigException(String.format(
"Must set exactly one of %s or %s",
ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG,
ResponsiveConfig.RESPONSIVE_LICENSE_FILE_CONFIG
));
}
final String licenseB64;
if (!license.isEmpty()) {
licenseB64 = license;
} else {
try {
licenseB64 = Files.readString(new File(licenseFile).toPath());
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
final ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(Base64.getDecoder().decode(licenseB64), LicenseDocument.class);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

/**
* Fill in the props with any overrides and all internal objects shared via the configs
* before these get finalized as a {@link StreamsConfig} object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public class ResponsiveConfig extends AbstractConfig {
private static final String RESPONSIVE_LICENSE_FILE_DOC
= "A path to a file containing your license.";

public static final String RESPONSIVE_LICENSE_SERVER_CONFIG = "responsive.license.server";
private static final String RESPONSIVE_LICENSE_SERVER_DEFAULT = "https://api.responsive.dev";
private static final String RESPONSIVE_LICENSE_SERVER_DOC =
"A URI for your Responsive license server.";

/**
* @deprecated use the responsive.storage.backend.type config with {@link StorageBackend#NONE}
*/
Expand Down Expand Up @@ -307,15 +312,20 @@ public class ResponsiveConfig extends AbstractConfig {
+ "lookups but requires more heap memory";


// ------------------ Misc functional overrides ----------------------
// ------------------ Misc configs ----------------------
public static final String RESTORE_OFFSET_REPAIR_ENABLED_CONFIG = "responsive.restore.offset.repair.enabled";
public static final boolean RESTORE_OFFSET_REPAIR_ENABLED_DEFAULT = false;
public static final String RESTORE_OFFSET_REPAIR_ENABLED_DOC = "When set to 'true', " + RESTORE_OFFSET_REPAIR_ENABLED_CONFIG
private static final boolean RESTORE_OFFSET_REPAIR_ENABLED_DEFAULT = false;
private static final String RESTORE_OFFSET_REPAIR_ENABLED_DOC = "When set to 'true', " + RESTORE_OFFSET_REPAIR_ENABLED_CONFIG
+ " will ignore OffsetOutOfRangeException and instead seek to the earliest available offset. This exception "
+ "should only happen in situations where there is truncation/retention on the changelog topic and restoring from the latest "
+ "committed offset in the remote store is no longer possible. Note that in some situations this may cause data "
+ "loss, use this configuration with caution";

public static final String ORIGIN_EVENT_REPORT_INTERVAL_MS_CONFIG = "responsive.origin.event.report.interval.ms";
private static final long ORIGIN_EVENT_REPORT_INTERVAL_MS_DEFAULT = 10_000L;
private static final String ORIGIN_EVENT_REPORT_INTERVAL_MS_DOC =
"How often to report origin event usage information. This should generally not be changed in production environments";

// ------------------ StreamsConfig overrides ----------------------

// These configuration values are required by Responsive, and a ConfigException will
Expand Down Expand Up @@ -347,7 +357,7 @@ public class ResponsiveConfig extends AbstractConfig {
RESPONSIVE_ENV_DOC
).define(
RESPONSIVE_LICENSE_CONFIG,
Type.STRING,
Type.PASSWORD,
"",
Importance.HIGH,
RESPONSIVE_LICENSE_DOC
Expand All @@ -357,6 +367,12 @@ public class ResponsiveConfig extends AbstractConfig {
"",
Importance.HIGH,
RESPONSIVE_LICENSE_FILE_DOC
).define(
RESPONSIVE_LICENSE_SERVER_CONFIG,
Type.STRING,
RESPONSIVE_LICENSE_SERVER_DEFAULT,
Importance.LOW,
RESPONSIVE_LICENSE_SERVER_DOC
).define(
STORAGE_BACKEND_TYPE_CONFIG,
Type.STRING,
Expand Down Expand Up @@ -635,6 +651,12 @@ public class ResponsiveConfig extends AbstractConfig {
atLeast(0),
Importance.MEDIUM,
RS3_RETRY_TIMEOUT_DOC
).define(
ORIGIN_EVENT_REPORT_INTERVAL_MS_CONFIG,
Type.LONG,
ORIGIN_EVENT_REPORT_INTERVAL_MS_DEFAULT,
Importance.LOW,
ORIGIN_EVENT_REPORT_INTERVAL_MS_DOC
);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.internal.clients;

import java.util.Collection;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

public class NoopOriginEventRecorder implements OriginEventRecorder {
@Override
public <K, V> ConsumerRecords<K, V> onPoll(final ConsumerRecords<K, V> records) {
return records;
}

@Override
public <K, V> ProducerRecord<K, V> onSend(final ProducerRecord<K, V> record) {
return record;
}

@Override
public void onConsumerCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
}

@Override
public void onProducerCommit() {
}

@Override
public void onSendOffsetsToTransaction(
final Map<TopicPartition, OffsetAndMetadata> offsets,
final String consumerGroupId
) {
}

@Override
public void onPartitionsLost(final Collection<TopicPartition> partitions) {
}

@Override
public void onUnsubscribe() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void onCommit(

private class ConsumerListener implements ResponsiveConsumer.Listener {
@Override
public void onCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
public void onConsumerCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (eos) {
throw new IllegalStateException("consumer commit is not expected with EOS");
}
Expand Down Expand Up @@ -151,7 +151,7 @@ public void onSendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> of
}

@Override
public void onCommit() {
public void onProducerCommit() {
if (!eos) {
throw new IllegalStateException("producer commit is not expected with alos");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2025 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.internal.clients;

import java.util.BitSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class allows us to efficiently count the number of events
* between two offsets that match a certain condition. This is somewhat
* memory efficient in that we can track 100K offsets with ~1.5K longs
* (64 bits per long), or roughly 12KB.
Comment on lines +19 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused by the point of this class. This is just counting the number of origin events since the last commit, right? It doesn't seem like we ever expose what the actual origin event offsets are, so why do we need to save them in this BitSet scheme -- ie why not just use an int that increments in #mark and resets/returns in #countAndShift?

Lmk if you'd rather sync online because I'm sure I'm missing something here 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should definitely have javadoc explaining it, originally I had implemented what you're saying but Rohan pointed out these two concerns: #424 (comment) and #424 (comment) both fixed by this bitset tracker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I missed those discussions, but yeah he's right. thanks for filling in the javadocs

* <p>
* This is necessary for origin event tracking: since we mark records
* on poll but report aligned to commit it is possible that there are
* records that are polled but not included in the committed offsets.
* This class remedies that by allowing us to count the marked events
* up to the committed offset.
* <p>
* Additionally, there are some situations that would cause us to poll
* a record multiple times (e.g. a task is reassigned to the same thread
* after an EOS error). This class is idempotent to marking the same
* offsets multiple times.
*/
public class OffsetTracker {

private static final Logger LOG = LoggerFactory.getLogger(OffsetTracker.class);

private BitSet offsets = new BitSet();
private long baseOffset;

public OffsetTracker(long baseOffset) {
this.baseOffset = baseOffset;
}

public void mark(final long offset) {
if (offset < baseOffset) {
LOG.error("Invalid offset {} lower than baseOffset {} marked", offset, baseOffset);
throw new IllegalArgumentException(
Copy link
Contributor

@ableegoldman ableegoldman Feb 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we log an error before throwing that also includes the value of offsets (here and below)

"Offset " + offset + " cannot be less than baseOffset " + baseOffset);
}

// assume that we won't be committing more than MAX_INT offsets
// in a single commit...
final int idx = Math.toIntExact(offset - baseOffset);
offsets.set(idx);
}

public int countAndShift(final long commitOffset) {
if (commitOffset < baseOffset) {
LOG.error("Invalid offset {} lower than baseOffset {} committed", commitOffset, baseOffset);
throw new IllegalArgumentException(
"Commit offset " + commitOffset + " cannot be less than baseOffset " + baseOffset);
}

final int shift = Math.toIntExact(commitOffset - baseOffset);
final int count = offsets.get(0, shift).cardinality();

// shift the bitset so that commitOffset is the new base offset
// for which we track marked entries
if (shift < offsets.length()) {
offsets = offsets.get(shift, offsets.length());
} else {
// if the shift is beyond the last entry in the bitset, we can
// just create a new empty bitset
offsets = new BitSet();
}

baseOffset = commitOffset;
return count;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.internal.clients;

import java.util.Collection;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

public interface OriginEventRecorder
extends ResponsiveConsumer.Listener, ResponsiveProducer.Listener {

@Override
<K, V> ConsumerRecords<K, V> onPoll(ConsumerRecords<K, V> records);

@Override
<K, V> ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

@Override
void onConsumerCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

@Override
void onProducerCommit();

@Override
void onSendOffsetsToTransaction(
final Map<TopicPartition, OffsetAndMetadata> offsets,
final String consumerGroupId
);

@Override
void onPartitionsLost(Collection<TopicPartition> partitions);

@Override
void onUnsubscribe();
}
Loading
Loading