Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers;
import dev.responsive.kafka.internal.clients.AsyncStreamsKafkaClientSupplier;
import dev.responsive.kafka.internal.clients.OriginEventReporter;
import dev.responsive.kafka.internal.clients.OriginEventReporterImpl;
import dev.responsive.kafka.internal.clients.ResponsiveKafkaClientSupplier;
import dev.responsive.kafka.internal.config.ConfigUtils;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
Expand All @@ -46,6 +48,10 @@
import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient;
import dev.responsive.kafka.internal.db.rs3.RS3TableFactory;
import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client;
import dev.responsive.kafka.internal.license.model.CloudLicenseV1;
import dev.responsive.kafka.internal.license.model.LicenseInfo;
import dev.responsive.kafka.internal.license.model.TimedTrialV1;
import dev.responsive.kafka.internal.license.model.UsageBasedV1;
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener;
Expand Down Expand Up @@ -94,6 +100,7 @@ public class ResponsiveKafkaStreams extends KafkaStreams {
private final ResponsiveStateListener responsiveStateListener;
private final ResponsiveRestoreListener responsiveRestoreListener;
private final SessionClients sessionClients;
private final OriginEventReporter originEventReporter;

/**
* Create a {@code ResponsiveKafkaStreams} instance.
Expand Down Expand Up @@ -200,10 +207,9 @@ protected ResponsiveKafkaStreams(final Params params) {
params.time
);

loadLicense(params.responsiveConfig);

this.responsiveMetrics = params.metrics;
this.sessionClients = params.sessionClients;
this.originEventReporter = params.oeReporter;

final ClientVersionMetadata versionMetadata = ClientVersionMetadata.loadVersionMetadata();
// Only log the version metadata for Responsive since Kafka Streams will log its own
Expand Down Expand Up @@ -369,10 +375,17 @@ public StateRestoreListener stateRestoreListener() {
}

private void closeInternal() {
originEventReporter.close();
responsiveStateListener.close();
sessionClients.closeAll();
}

@Override
public synchronized void start() throws IllegalStateException, StreamsException {
originEventReporter.start();
super.start();
}

@Override
public void close() {
super.close();
Expand Down Expand Up @@ -411,6 +424,7 @@ protected static class Params {
private SessionClients sessionClients;
private Optional<AsyncThreadPoolRegistry> asyncThreadPoolRegistry;
private ResponsiveKafkaClientSupplier responsiveKafkaClientSupplier;
private OriginEventReporter oeReporter;

public Params(final Topology topology, final Map<?, ?> configs) {
this.topology = topology;
Expand Down Expand Up @@ -443,6 +457,7 @@ public Params withTime(final Time time) {
// that it's impossible to use a Params instance that hasn't called build(),
// but that felt a little extra
public Params build() {
var license = loadLicense(responsiveConfig);
this.asyncThreadPoolRegistry = AsyncUtils.configuredAsyncThreadPool(
responsiveConfig,
streamsConfig.getInt(NUM_STREAM_THREADS_CONFIG),
Expand All @@ -454,12 +469,14 @@ public Params build() {
asyncThreadPoolRegistry.get()
) : innerClientSupplier;

this.oeReporter = reporter(responsiveConfig, license);
this.responsiveKafkaClientSupplier = new ResponsiveKafkaClientSupplier(
delegateKafkaClientSupplier,
responsiveConfig,
streamsConfig,
storeRegistry,
metrics,
oeReporter,
storageBackend
);

Expand Down Expand Up @@ -544,5 +561,23 @@ public Params build() {

return this;
}

private static OriginEventReporter reporter(
final ResponsiveConfig responsiveConfig,
final LicenseInfo license
) {
switch (license.type()) {
case CloudLicenseV1.TYPE_NAME:
case TimedTrialV1.TYPE_NAME:
// don't report counts for cloud/timed trial licenses
return (tp, count) -> { };
case UsageBasedV1.TYPE_NAME:
default:
return new OriginEventReporterImpl(
responsiveConfig,
license
);
}
}
}
}

This file was deleted.

This file was deleted.

Loading
Loading