diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java index 4d985094f..75d918e40 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java @@ -162,6 +162,10 @@ public long toMillis() { return duration().toMillis(); } + public TtlDuration minus(final long otherMs) { + return TtlDuration.of(duration().minusMillis(otherMs)); + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java index ee3e8edbc..a60591327 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java @@ -308,7 +308,8 @@ public BoundStatement insert( final long epochMillis ) { if (ttlResolver.isPresent()) { - final Optional rowTtl = ttlResolver.get().computeTtl(key, value); + final Optional rowTtl = + ttlResolver.get().computeInsertTtl(key, value, epochMillis); // If user happens to return same ttl value as the default, skip applying it at // the row level since this is less efficient in Scylla @@ -342,7 +343,7 @@ public byte[] get(final int kafkaPartition, final Bytes key, long streamTimeMs) } else if (ttlResolver.get().needsValueToComputeTtl()) { return postFilterGet(key, streamTimeMs); } else { - final TtlDuration ttl = ttlResolver.get().resolveTtl(key, null); + final TtlDuration ttl = ttlResolver.get().resolveRowTtl(key, null); if (ttl.isFinite()) { final long minValidTimeMs = streamTimeMs - ttl.toMillis(); return preFilterGet(key, minValidTimeMs); @@ -398,7 +399,7 @@ private byte[] postFilterGet(final Bytes key, long streamTimeMs) { final Row rowResult = result.get(0); final byte[] value = getValueFromRow(rowResult); - final TtlDuration ttl = ttlResolver.get().resolveTtl(key, value); + final TtlDuration ttl = ttlResolver.get().resolveRowTtl(key, value); if (ttl.isFinite()) { final long minValidTsFromValue = streamTimeMs - ttl.toMillis(); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable.java index 0b1b633b7..32884f503 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable.java @@ -55,7 +55,7 @@ public byte[] get(final int kafkaPartition, final Bytes key, final long streamTi } if (ttlResolver.isPresent()) { - final TtlDuration rowTtl = ttlResolver.get().resolveTtl(key, value.value()); + final TtlDuration rowTtl = ttlResolver.get().resolveRowTtl(key, value.value()); if (rowTtl.isFinite()) { final long minValidTs = streamTimeMs - rowTtl.toMillis(); if (value.epochMillis < minValidTs) { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveKeyValueStore.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveKeyValueStore.java index 2d7beb5bd..cbfc36db5 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveKeyValueStore.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveKeyValueStore.java @@ -98,7 +98,8 @@ public void init(final ProcessorContext context, final StateStore root) { @Override public void init(final StateStoreContext storeContext, final StateStore root) { try { - final TaskType taskType = asInternalProcessorContext(storeContext).taskType(); + final var internalProcessorContext = asInternalProcessorContext(storeContext); + final TaskType taskType = internalProcessorContext.taskType(); log = new LogContext( String.format( "%sstore [%s] ", @@ -117,7 +118,8 @@ public void init(final StateStoreContext storeContext, final StateStore root) { final StateSerdes stateSerdes = StoreAccessorUtil.extractKeyValueStoreSerdes(root); final Optional> ttlResolver = TtlResolver.fromTtlProviderAndStateSerdes( stateSerdes, - params.ttlProvider() + params.ttlProvider(), + internalProcessorContext ); operations = opsProvider.provide(params, ttlResolver, storeContext, taskType); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/TtlResolver.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/TtlResolver.java index b114f28c9..abafd67e0 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/TtlResolver.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/TtlResolver.java @@ -21,6 +21,7 @@ import dev.responsive.kafka.internal.utils.StateDeserializer; import java.util.Optional; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.StateSerdes; public class TtlResolver { @@ -29,11 +30,13 @@ public class TtlResolver { private final StateDeserializer stateDeserializer; private final TtlProvider ttlProvider; + private final ProcessorContext processorContext; @SuppressWarnings("unchecked") public static Optional> fromTtlProviderAndStateSerdes( final StateSerdes stateSerdes, - final Optional> ttlProvider + final Optional> ttlProvider, + final ProcessorContext processorContext ) { return ttlProvider.isPresent() ? Optional.of( @@ -42,17 +45,20 @@ public class TtlResolver { stateSerdes.topic(), stateSerdes.keyDeserializer(), stateSerdes.valueDeserializer()), - (TtlProvider) ttlProvider.get() + (TtlProvider) ttlProvider.get(), + processorContext )) : Optional.empty(); } public TtlResolver( final StateDeserializer stateDeserializer, - final TtlProvider ttlProvider + final TtlProvider ttlProvider, + final ProcessorContext processorContext ) { this.stateDeserializer = stateDeserializer; this.ttlProvider = ttlProvider; + this.processorContext = processorContext; } public TtlDuration defaultTtl() { @@ -68,18 +74,33 @@ public boolean needsValueToComputeTtl() { } /** - * @return the raw result from the user's ttl computation function for this row + * @return the raw result from the user's ttl computation function for this row, + * adjusted by the difference between current time and the record timestamp. + * Used for writes. */ - public Optional computeTtl(final Bytes keyBytes, final byte[] valueBytes) { - return ttlProvider.computeTtl(keyBytes.get(), valueBytes, stateDeserializer); + public Optional computeInsertTtl( + final Bytes keyBytes, + final byte[] valueBytes, + final long timestampMs + ) { + return ttlProvider.computeTtl(keyBytes.get(), valueBytes, stateDeserializer) + .map(ttl -> { + if (ttl.isFinite()) { + return ttl.minus(processorContext.currentSystemTimeMs() - timestampMs); + } else { + return ttl; + } + }); } /** * @return the actual ttl for this row after resolving the raw result returned by the user - * (eg applying the default value) + * (eg applying the default value). Used for reads */ - public TtlDuration resolveTtl(final Bytes keyBytes, final byte[] valueBytes) { - final Optional ttl = computeTtl(keyBytes, valueBytes); + public TtlDuration resolveRowTtl(final Bytes keyBytes, final byte[] valueBytes) { + final Optional ttl = + ttlProvider.computeTtl(keyBytes.get(), valueBytes, stateDeserializer); + return ttl.orElse(defaultTtl()); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java index ceb0e42dd..f419e70df 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java @@ -46,6 +46,7 @@ import java.util.function.Function; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.MockProcessorContext; import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -56,6 +57,8 @@ @ExtendWith(ResponsiveExtension.class) class CassandraFactTableIntegrationTest { + private final MockProcessorContext mockContext = new MockProcessorContext(); + private String storeName; // ie the "kafkaName", NOT the "cassandraName" private ResponsiveKeyValueParams params; private CassandraClient client; @@ -214,7 +217,8 @@ public void shouldRespectSemanticKeyBasedTtl() throws Exception { defaultPartitioner(), Optional.of(new TtlResolver<>( new StateDeserializer<>("ignored", new StringDeserializer(), new StringDeserializer()), - ttlProvider)) + ttlProvider, + mockContext)) )); table.init(1); @@ -228,6 +232,7 @@ public void shouldRespectSemanticKeyBasedTtl() throws Exception { // When: final long insertTimeMs = 0L; + mockContext.setCurrentSystemTimeMs(insertTimeMs); client.execute(table.insert(1, noTtlKey, val, insertTimeMs)); client.execute(table.insert(1, defaultTtlKey, val, insertTimeMs)); client.execute(table.insert(1, tenMinTtlKey, val, insertTimeMs)); @@ -281,7 +286,8 @@ public void shouldRespectSemanticKeyValueBasedTtl() throws Exception { defaultPartitioner(), Optional.of(new TtlResolver<>( new StateDeserializer<>("ignored", new StringDeserializer(), new StringDeserializer()), - ttlProvider)) + ttlProvider, + mockContext)) )); table.init(1); @@ -301,6 +307,7 @@ public void shouldRespectSemanticKeyValueBasedTtl() throws Exception { // When long insertTimeMs = 0L; + mockContext.setCurrentSystemTimeMs(insertTimeMs); client.execute(table.insert(1, tenMinTtlKey, val, insertTimeMs)); client.execute(table.insert(1, defaultTtlKey, defaultTtlValue, insertTimeMs)); client.execute(table.insert(1, noTtlKey, noTtlValue, insertTimeMs)); @@ -361,7 +368,8 @@ public void shouldRespectOverridesWithValueBasedTtl() throws Exception { defaultPartitioner(), Optional.of(new TtlResolver<>( new StateDeserializer<>("ignored", new StringDeserializer(), new StringDeserializer()), - ttlProvider) + ttlProvider, + mockContext) ))); table.init(1); @@ -375,6 +383,7 @@ public void shouldRespectOverridesWithValueBasedTtl() throws Exception { // When long currentTimeMs = 0L; + mockContext.setCurrentSystemTimeMs(currentTimeMs); // first record set to expire at 3ms client.execute(table.insert(1, key, threeMinTtlValue, currentTimeMs)); @@ -384,10 +393,12 @@ public void shouldRespectOverridesWithValueBasedTtl() throws Exception { // insert new record with 3ms ttl -- now set to expire at 10ms currentTimeMs = Duration.ofMinutes(7).toMillis(); + mockContext.setCurrentSystemTimeMs(currentTimeMs); client.execute(table.insert(1, key, threeMinTtlValue, currentTimeMs)); // override with 10ms ttl -- now set to expire at 18ms currentTimeMs = Duration.ofMinutes(8).toMillis(); + mockContext.setCurrentSystemTimeMs(currentTimeMs); client.execute(table.insert(1, key, tenMinTtlValue, currentTimeMs)); // record should still exist after 10ms @@ -396,6 +407,7 @@ public void shouldRespectOverridesWithValueBasedTtl() throws Exception { // override with default ttl (30ms) -- now set to expire at 45ms currentTimeMs = Duration.ofMinutes(15).toMillis(); + mockContext.setCurrentSystemTimeMs(currentTimeMs); client.execute(table.insert(1, key, defaultTtlValue, currentTimeMs)); // record should still exist after 18ms @@ -404,6 +416,7 @@ public void shouldRespectOverridesWithValueBasedTtl() throws Exception { // override with no ttl -- now set to never expire currentTimeMs = Duration.ofMinutes(30).toMillis(); + mockContext.setCurrentSystemTimeMs(currentTimeMs); client.execute(table.insert(1, key, noTtlValue, currentTimeMs)); // record should still exist after 45ms @@ -411,4 +424,5 @@ public void shouldRespectOverridesWithValueBasedTtl() throws Exception { assertThat(table.get(1, key, currentTimeMs), is(noTtlValue)); } + } \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java index a601fe6b9..8817a64ec 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java @@ -90,7 +90,7 @@ public MockResponsiveKafkaStreams( public static Optional> defaultOnlyTtl(final Duration ttl) { return Optional.of(new TtlResolver<>( new StateDeserializer<>("ignored", null, null), - TtlProvider.withDefault(ttl)) + TtlProvider.withDefault(ttl), null) ); } @@ -99,7 +99,7 @@ public MockResponsiveKafkaStreams( ) { return Optional.of(new TtlResolver<>( new StateDeserializer<>("ignored", null, null), - ttlProvider) + ttlProvider, null) ); } @@ -108,7 +108,8 @@ public MockResponsiveKafkaStreams( ) { return ttlProvider.isPresent() ? Optional.of( - new TtlResolver<>(new StateDeserializer<>("ignored", null, null), ttlProvider.get())) + new TtlResolver<>(new StateDeserializer<>("ignored", null, null), ttlProvider.get(), + null)) : Optional.empty(); }