2323import static dev .responsive .kafka .internal .db .ColumnName .PARTITION_KEY ;
2424import static dev .responsive .kafka .internal .db .ColumnName .ROW_TYPE ;
2525import static dev .responsive .kafka .internal .db .ColumnName .TIMESTAMP ;
26+ import static dev .responsive .kafka .internal .db .ColumnName .TTL_SECONDS ;
2627import static dev .responsive .kafka .internal .stores .ResponsiveStoreRegistration .NO_COMMITTED_OFFSET ;
2728
2829import com .datastax .oss .driver .api .core .cql .BoundStatement ;
3233import com .datastax .oss .driver .api .querybuilder .QueryBuilder ;
3334import com .datastax .oss .driver .api .querybuilder .SchemaBuilder ;
3435import com .datastax .oss .driver .api .querybuilder .schema .CreateTableWithOptions ;
36+ import dev .responsive .kafka .api .stores .TtlProvider .TtlDuration ;
3537import dev .responsive .kafka .internal .db .spec .RemoteTableSpec ;
3638import dev .responsive .kafka .internal .stores .TtlResolver ;
3739import java .nio .ByteBuffer ;
38- import java .time .Instant ;
3940import java .util .List ;
4041import java .util .Objects ;
4142import java .util .Optional ;
4748
4849public class CassandraFactTable implements RemoteKVTable <BoundStatement > {
4950
50- private static final Logger LOG = LoggerFactory .getLogger (
51- CassandraFactTable .class );
51+ private static final Logger LOG = LoggerFactory .getLogger (CassandraFactTable .class );
5252
5353 private final String name ;
5454 private final CassandraClient client ;
55+ private final Optional <TtlResolver <?, ?>> ttlResolver ;
5556
5657 private final PreparedStatement get ;
58+ private final PreparedStatement getWithTimestamp ;
5759 private final PreparedStatement insert ;
60+ private final PreparedStatement insertWithTtl ;
5861 private final PreparedStatement delete ;
5962 private final PreparedStatement fetchOffset ;
6063 private final PreparedStatement setOffset ;
6164
6265 public CassandraFactTable (
6366 final String name ,
6467 final CassandraClient client ,
68+ final Optional <TtlResolver <?, ?>> ttlResolver ,
6569 final PreparedStatement get ,
70+ final PreparedStatement getWithTimestamp ,
6671 final PreparedStatement insert ,
72+ final PreparedStatement insertWithTtl ,
6773 final PreparedStatement delete ,
6874 final PreparedStatement fetchOffset ,
6975 final PreparedStatement setOffset
7076 ) {
7177 this .name = name ;
7278 this .client = client ;
79+ this .ttlResolver = ttlResolver ;
7380 this .get = get ;
81+ this .getWithTimestamp = getWithTimestamp ;
7482 this .insert = insert ;
83+ this .insertWithTtl = insertWithTtl ;
7584 this .delete = delete ;
7685 this .fetchOffset = fetchOffset ;
7786 this .setOffset = setOffset ;
@@ -84,6 +93,7 @@ public static CassandraFactTable create(
8493 final String name = spec .tableName ();
8594 LOG .info ("Creating fact data table {} in remote store." , name );
8695
96+ final Optional <TtlResolver <?, ?>> ttlResolver = spec .ttlResolver ();
8797 final CreateTableWithOptions createTable = spec .applyDefaultOptions (
8898 createTable (name , spec .ttlResolver ())
8999 );
@@ -115,6 +125,18 @@ public static CassandraFactTable create(
115125 QueryOp .WRITE
116126 );
117127
128+ final var insertWithTtl = client .prepare (
129+ QueryBuilder
130+ .insertInto (name )
131+ .value (ROW_TYPE .column (), RowType .DATA_ROW .literal ())
132+ .value (DATA_KEY .column (), bindMarker (DATA_KEY .bind ()))
133+ .value (TIMESTAMP .column (), bindMarker (TIMESTAMP .bind ()))
134+ .value (DATA_VALUE .column (), bindMarker (DATA_VALUE .bind ()))
135+ .usingTtl (bindMarker (TTL_SECONDS .bind ()))
136+ .build (),
137+ QueryOp .WRITE
138+ );
139+
118140 final var get = client .prepare (
119141 QueryBuilder
120142 .selectFrom (name )
@@ -129,6 +151,20 @@ public static CassandraFactTable create(
129151 QueryOp .READ
130152 );
131153
154+ final var getWithTimestamp = client .prepare (
155+ QueryBuilder
156+ .selectFrom (name )
157+ .columns (DATA_VALUE .column (), TIMESTAMP .column ())
158+ .where (ROW_TYPE .relation ().isEqualTo (RowType .DATA_ROW .literal ()))
159+ .where (DATA_KEY .relation ().isEqualTo (bindMarker (DATA_KEY .bind ())))
160+ .where (TIMESTAMP .relation ().isGreaterThanOrEqualTo (bindMarker (TIMESTAMP .bind ())))
161+ // ALLOW FILTERING is OK b/c the query only scans one partition (it actually only
162+ // returns a single value)
163+ .allowFiltering ()
164+ .build (),
165+ QueryOp .READ
166+ );
167+
132168 final var delete = client .prepare (
133169 QueryBuilder
134170 .deleteFrom (name )
@@ -161,8 +197,11 @@ public static CassandraFactTable create(
161197 return new CassandraFactTable (
162198 name ,
163199 client ,
200+ ttlResolver ,
164201 get ,
202+ getWithTimestamp ,
165203 insert ,
204+ insertWithTtl ,
166205 delete ,
167206 fetchOffset ,
168207 setOffset
@@ -178,7 +217,7 @@ private static CreateTableWithOptions createTable(
178217 .ifNotExists ()
179218 .withPartitionKey (ROW_TYPE .column (), DataTypes .TINYINT )
180219 .withPartitionKey (DATA_KEY .column (), DataTypes .BLOB )
181- .withColumn (TIMESTAMP .column (), DataTypes .TIMESTAMP )
220+ .withColumn (TIMESTAMP .column (), DataTypes .BIGINT )
182221 .withColumn (DATA_VALUE .column (), DataTypes .BLOB );
183222
184223 if (ttlResolver .isPresent () && ttlResolver .get ().defaultTtl ().isFinite ()) {
@@ -267,44 +306,98 @@ public BoundStatement insert(
267306 final byte [] value ,
268307 final long epochMillis
269308 ) {
309+ if (ttlResolver .isPresent ()) {
310+ final Optional <TtlDuration > rowTtl = ttlResolver .get ().computeTtl (key , value );
311+
312+ if (rowTtl .isPresent ()) {
313+ return insertWithTtl
314+ .bind ()
315+ .setByteBuffer (DATA_KEY .bind (), ByteBuffer .wrap (key .get ()))
316+ .setByteBuffer (DATA_VALUE .bind (), ByteBuffer .wrap (value ))
317+ .setLong (TIMESTAMP .bind (), epochMillis )
318+ .setInt (TTL_SECONDS .bind (), (int ) rowTtl .get ().toSeconds ());
319+ }
320+ }
321+
270322 return insert
271323 .bind ()
272324 .setByteBuffer (DATA_KEY .bind (), ByteBuffer .wrap (key .get ()))
273325 .setByteBuffer (DATA_VALUE .bind (), ByteBuffer .wrap (value ))
274- .setInstant (TIMESTAMP .bind (), Instant . ofEpochMilli ( epochMillis ) );
326+ .setLong (TIMESTAMP .bind (), epochMillis );
275327 }
276328
277329 @ Override
278- public byte [] get (final int kafkaPartition , final Bytes key , long minValidTs ) {
279- final BoundStatement get = this .get
280- .bind ()
281- .setByteBuffer (DATA_KEY .bind (), ByteBuffer .wrap (key .get ()))
282- .setInstant (TIMESTAMP .bind (), Instant .ofEpochMilli (minValidTs ));
330+ public byte [] get (final int kafkaPartition , final Bytes key , long streamTimeMs ) {
331+ long minValidTs = 0L ;
332+ if (ttlResolver .isPresent () && !ttlResolver .get ().needsValueToComputeTtl ()) {
333+ final TtlDuration ttl = ttlResolver .get ().resolveTtl (key , null );
334+ if (ttl .isFinite ()) {
335+ minValidTs = streamTimeMs - ttl .toMillis ();
336+ }
337+ }
283338
284- final List <Row > result = client .execute (get ).all ();
285- if (result .size () > 1 ) {
286- throw new IllegalArgumentException ();
287- } else if (result .isEmpty ()) {
288- return null ;
339+ if (ttlResolver .isEmpty () || !ttlResolver .get ().needsValueToComputeTtl ()) {
340+ final BoundStatement getQuery = get
341+ .bind ()
342+ .setByteBuffer (DATA_KEY .bind (), ByteBuffer .wrap (key .get ()))
343+ .setLong (TIMESTAMP .bind (), minValidTs );
344+ final List <Row > result = client .execute (getQuery ).all ();
345+
346+ if (result .size () > 1 ) {
347+ throw new IllegalStateException ("Received multiple results for the same key" );
348+ } else if (result .isEmpty ()) {
349+ return null ;
350+ } else {
351+ return getValueFromRow (result .get (0 ));
352+ }
289353 } else {
290- final ByteBuffer value = result .get (0 ).getByteBuffer (DATA_VALUE .column ());
291- return Objects .requireNonNull (value ).array ();
354+ final BoundStatement getQuery = getWithTimestamp
355+ .bind ()
356+ .setByteBuffer (DATA_KEY .bind (), ByteBuffer .wrap (key .get ()))
357+ .setLong (TIMESTAMP .bind (), minValidTs );
358+ final List <Row > result = client .execute (getQuery ).all ();
359+
360+ if (result .size () > 1 ) {
361+ throw new IllegalStateException ("Received multiple results for the same key" );
362+ } else if (result .isEmpty ()) {
363+ return null ;
364+ }
365+
366+ final Row rowResult = result .get (0 );
367+ final byte [] value = getValueFromRow (rowResult );
368+ final TtlDuration ttl = ttlResolver .get ().resolveTtl (key , value );
369+
370+ if (ttl .isFinite ()) {
371+ final long minValidTsFromValue = streamTimeMs - ttl .toMillis ();
372+ final long recordTs = rowResult .getLong (TIMESTAMP .column ());
373+ if (recordTs < minValidTsFromValue ) {
374+ return null ;
375+ }
376+ }
377+
378+ return value ;
292379 }
293380 }
294381
382+ private byte [] getValueFromRow (final Row row ) {
383+ return Objects .requireNonNull (row .getByteBuffer (DATA_VALUE .column ())).array ();
384+ }
385+
295386 @ Override
296387 public KeyValueIterator <Bytes , byte []> range (
297388 final int kafkaPartition ,
298389 final Bytes from ,
299390 final Bytes to ,
300- long minValidTs ) {
391+ long streamTimeMs
392+ ) {
301393 throw new UnsupportedOperationException ("range scans are not supported on fact tables." );
302394 }
303395
304396 @ Override
305397 public KeyValueIterator <Bytes , byte []> all (
306398 final int kafkaPartition ,
307- long minValidTs ) {
399+ long streamTimeMs
400+ ) {
308401 throw new UnsupportedOperationException ("all is not supported on fact tables" );
309402 }
310403
0 commit comments