Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

package dev.responsive.kafka.api;

import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.METRICS_ENABLED_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

package dev.responsive.kafka.api.async.internals;

import static dev.responsive.kafka.api.async.internals.AsyncUtils.getAsyncThreadPool;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_FLUSH_INTERVAL_MS_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_KEY_CONFIG;
import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadAsyncThreadPoolRegistry;

import dev.responsive.kafka.api.async.AsyncProcessorSupplier;
import dev.responsive.kafka.api.async.internals.contexts.AsyncUserProcessorContext;
Expand All @@ -40,7 +40,6 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
Expand Down Expand Up @@ -218,7 +217,7 @@ private void initFields(
final long punctuationInterval = configs.getLong(ASYNC_FLUSH_INTERVAL_MS_CONFIG);
final int maxEventsPerKey = configs.getInt(ASYNC_MAX_EVENTS_QUEUED_PER_KEY_CONFIG);

this.asyncThreadPoolRegistration = getAsyncThreadPool(taskContext, streamThreadName);
this.asyncThreadPoolRegistration = getAsyncThreadPool(appConfigs, streamThreadName);
asyncThreadPoolRegistration.registerAsyncProcessor(taskId, this::flushPendingEventsForCommit);
asyncThreadPoolRegistration.threadPool().maybeInitThreadPoolMetrics();

Expand Down Expand Up @@ -756,17 +755,5 @@ private void verifyConnectedStateStores(
}
}

private static AsyncThreadPoolRegistration getAsyncThreadPool(
final ProcessingContext context,
final String streamThreadName
) {
try {
final AsyncThreadPoolRegistry registry = loadAsyncThreadPoolRegistry(context.appConfigs());
return registry.asyncThreadPoolForStreamThread(streamThreadName);
} catch (final Exception e) {
throw new ConfigException(
"Unable to locate async thread pool registry. Make sure to configure "
+ ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public <KOut, VOut> void scheduleForProcessing(
= inFlight.computeIfAbsent(asyncProcessorId, k -> new ConcurrentHashMap<>());

for (final AsyncEvent event : events) {
log.info("Scheduled event {}", event.inputRecord());
log.trace("Scheduled event {}", event.inputRecord());
try {
queueSemaphore.acquire();
} catch (final InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static dev.responsive.kafka.api.async.internals.AsyncThreadPool.ASYNC_THREAD_NAME;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG;
import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadAsyncThreadPoolRegistry;

import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder;
import dev.responsive.kafka.api.config.ResponsiveConfig;
Expand All @@ -26,6 +27,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.StoreBuilder;
Expand Down Expand Up @@ -155,4 +157,18 @@ public static Optional<AsyncThreadPoolRegistry> configuredAsyncThreadPool(
}
}

public static AsyncThreadPoolRegistration getAsyncThreadPool(
final Map<String, Object> appConfigs,
final String streamThreadName
) {
try {
final AsyncThreadPoolRegistry registry = loadAsyncThreadPoolRegistry(appConfigs);
return registry.asyncThreadPoolForStreamThread(streamThreadName);
} catch (final Exception e) {
throw new ConfigException(
"Unable to locate async thread pool registry. Make sure to configure "
+ ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ public Segmenter(
);
}

public long retentionPeriodMs() {
return retentionPeriodMs;
}

public long segmentIntervalMs() {
return segmentIntervalMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@

package dev.responsive.kafka.api;

import static dev.responsive.kafka.api.async.internals.AsyncUtils.getAsyncThreadPool;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG;
import static dev.responsive.kafka.internal.stores.TTDRestoreListener.mockRestoreListener;

import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry;
import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistration;
import dev.responsive.kafka.api.async.internals.AsyncUtils;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.clients.TTDCassandraClient;
Expand All @@ -26,20 +28,26 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TTDUtils.TopologyTestDriverAccessor;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.TestRecord;

public class ResponsiveTopologyTestDriver extends TopologyTestDriver {
public class ResponsiveTopologyTestDriver extends TopologyTestDriverAccessor {
public static final String RESPONSIVE_TTD_ORG = "Responsive";
public static final String RESPONSIVE_TTD_ENV = "TopologyTestDriver";

private final TTDCassandraClient client;
private final Optional<AsyncThreadPoolRegistration> asyncThreadPool;

/**
* Create a new test diver instance with default test properties.
Expand Down Expand Up @@ -97,7 +105,7 @@ public ResponsiveTopologyTestDriver(
) {
this(
topology,
config,
baseProps(config),
initialWallClockTime,
new TTDCassandraClient(
new TTDMockAdmin(baseProps(config), topology),
Expand All @@ -115,9 +123,16 @@ public ResponsiveTopologyTestDriver(
@Override
public void advanceWallClockTime(final Duration advance) {
client.advanceWallClockTime(advance);
client.flush();
super.advanceWallClockTime(advance);
}

public void flush() {
asyncThreadPool.ifPresent(AsyncThreadPoolRegistration::flushAllAsyncEvents);
client.flush();
super.advanceWallClockTime(Duration.ZERO);
}

private ResponsiveTopologyTestDriver(
final Topology topology,
final Properties config,
Expand All @@ -130,23 +145,34 @@ private ResponsiveTopologyTestDriver(
initialWallClockTime
);
this.client = cassandraClient;
this.asyncThreadPool = getAsyncThreadPoolRegistration(super.props());
}

@Override
protected <K, V> void pipeRecord(
final String topic,
final TestRecord<K, V> record,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
final Instant time
) {
super.pipeRecord(topic, record, keySerializer, valueSerializer, time);
flush();
}

private static Properties testDriverProps(
final Properties userProps,
final Properties baseProps,
final TopologyDescription topologyDescription,
final TTDCassandraClient client
) {
final Properties props = baseProps(userProps);

final SessionClients sessionClients = new SessionClients(
Optional.empty(), Optional.of(client), Optional.empty(), false, client.mockAdmin()
);
final var restoreListener = mockRestoreListener(props);
final var restoreListener = mockRestoreListener(baseProps);
sessionClients.initialize(restoreListener.metrics(), restoreListener);

final var metrics = new ResponsiveMetrics(new Metrics());
final String appId = userProps.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
final String appId = baseProps.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
metrics.initializeTags(
appId,
appId + "-client",
Expand All @@ -160,14 +186,14 @@ private static Properties testDriverProps(
.withMetrics(metrics)
.withTopologyDescription(topologyDescription);

AsyncUtils.configuredAsyncThreadPool(ResponsiveConfig.responsiveConfig(props), 1, metrics)
AsyncUtils.configuredAsyncThreadPool(ResponsiveConfig.responsiveConfig(baseProps), 1, metrics)
.ifPresent(threadPoolRegistry -> {
threadPoolRegistry.startNewAsyncThreadPool("Test worker");
threadPoolRegistry.startNewAsyncThreadPool(Thread.currentThread().getName());
sessionConfig.withAsyncThreadPoolRegistry(threadPoolRegistry);
});

props.putAll(sessionConfig.build());
return props;
baseProps.putAll(sessionConfig.build());
return baseProps;
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -197,6 +223,20 @@ private static MockTime mockTime(final Instant initialWallClockTime) {
return mockTime;
}

private static Optional<AsyncThreadPoolRegistration> getAsyncThreadPoolRegistration(
final Properties props
) {
final int asyncThreadPoolSize = (int) props.getOrDefault(ASYNC_THREAD_POOL_SIZE_CONFIG, 0);

if (asyncThreadPoolSize > 0) {
final Map<String, Object> configMap = new HashMap<>();
// stupid conversion to deal with Map<String, Object> vs Properties type discrepancy
props.forEach((key, value) -> configMap.put(key.toString(), value));

return Optional.of(getAsyncThreadPool(configMap, Thread.currentThread().getName()));
} else {
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public long currentWallClockTimeMs() {
}

public void advanceWallClockTime(final Duration advance) {
flush();
time.sleep(advance.toMillis());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public TTDWindowTable(
) {
super(client);
this.name = spec.tableName();
this.stub = new WindowStoreStub();
this.stub = new WindowStoreStub(partitioner.segmenter().retentionPeriodMs());
this.partitioner = partitioner;
}

Expand Down Expand Up @@ -207,17 +207,17 @@ protected RemoteWriteResult<SegmentPartition> updateOffsetAndStreamTime(
final long consumedOffset,
final long streamTime
) {
return null;
return RemoteWriteResult.success(null);
}

@Override
protected RemoteWriteResult<SegmentPartition> createSegment(final SegmentPartition partition) {
return null;
return RemoteWriteResult.success(null);
}

@Override
protected RemoteWriteResult<SegmentPartition> deleteSegment(final SegmentPartition partition) {
return null;
return RemoteWriteResult.success(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ public class WindowStoreStub {
private final long retentionPeriod;
private long observedStreamTime = 0L;

public WindowStoreStub() {
// TODO: how can we pass the actual retention period through to the store stub?
this.retentionPeriod = 15L;
public WindowStoreStub(final long retentionPeriod) {
this.retentionPeriod = retentionPeriod;
}

public void put(final WindowedKey key, final byte[] value) {
Expand All @@ -48,7 +47,7 @@ public byte[] fetch(
final long windowStart
) {
final WindowedKey windowedKey = new WindowedKey(key, windowStart);
if (windowStart < minValidTimestamp() && records.containsKey(windowedKey)) {
if (windowStart > minValidTimestamp() && records.containsKey(windowedKey)) {
return records.get(windowedKey);
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@

import static org.apache.kafka.streams.processor.internals.ProcessorStateManager.storeChangelogTopic;

import java.time.Instant;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.test.TestRecord;

/**
* A utility class that lives in the o.a.k.streams package so we can access
Expand Down Expand Up @@ -47,4 +51,32 @@ public static Set<String> extractChangelogTopics(final Topology topology) {
.flatMap(t -> t.stateChangelogTopics.keySet().stream())
.collect(Collectors.toSet());
}

public static class TopologyTestDriverAccessor extends TopologyTestDriver {

private final Properties props;

public TopologyTestDriverAccessor(
final Topology topology,
final Properties config,
final Instant initialWallClockTime
) {
super(topology, config, initialWallClockTime);
this.props = config;
}

public Properties props() {
return props;
}

@Override
protected <K, V> void pipeRecord(final String topic,
final TestRecord<K, V> record,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
final Instant time) {
super.pipeRecord(topic, record, keySerializer, valueSerializer, time);
}

}
}
Loading
Loading