31
31
import org .apache .kafka .streams .kstream .KTable ;
32
32
import org .apache .kafka .streams .kstream .Materialized ;
33
33
import org .apache .kafka .streams .kstream .ValueJoiner ;
34
+ import org .apache .kafka .streams .state .KeyValueIterator ;
34
35
import org .apache .kafka .streams .state .KeyValueStore ;
35
36
import org .apache .kafka .streams .state .Stores ;
36
37
import org .apache .kafka .streams .state .ValueAndTimestamp ;
@@ -174,7 +175,7 @@ public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin,
174
175
final TestInputTopic <String , String > left = driver .createInputTopic (LEFT_TABLE , new StringSerializer (), new StringSerializer ());
175
176
final TestOutputTopic <String , String > outputTopic = driver .createOutputTopic (OUTPUT , new StringDeserializer (), new StringDeserializer ());
176
177
final TestOutputTopic <String , String > rejoinOutputTopic = rejoin ? driver .createOutputTopic (REJOIN_OUTPUT , new StringDeserializer (), new StringDeserializer ()) : null ;
177
- final KeyValueStore <String , String > store = driver .getKeyValueStore ("store" );
178
+ final KeyValueStore <String , ValueAndTimestamp < String >> store = driver .getTimestampedKeyValueStore ("store" );
178
179
179
180
// Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records
180
181
right .pipeInput ("rhs1" , "rhsValue1" , baseTimestamp );
@@ -257,7 +258,7 @@ public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin,
257
258
}
258
259
// Now delete one LHS entity such that one delete is propagated down to the output.
259
260
260
- left .pipeInput ("lhs1" , ( String ) null , baseTimestamp + 6 );
261
+ left .pipeInput ("lhs1" , null , baseTimestamp + 6 );
261
262
assertThat (
262
263
outputTopic .readKeyValuesToMap (),
263
264
is (mkMap (
@@ -298,7 +299,7 @@ public void doJoinFromRightThenDeleteRightEntity(final boolean leftJoin,
298
299
final TestInputTopic <String , String > right = driver .createInputTopic (RIGHT_TABLE , new StringSerializer (), new StringSerializer ());
299
300
final TestInputTopic <String , String > left = driver .createInputTopic (LEFT_TABLE , new StringSerializer (), new StringSerializer ());
300
301
final TestOutputTopic <String , String > outputTopic = driver .createOutputTopic (OUTPUT , new StringDeserializer (), new StringDeserializer ());
301
- final KeyValueStore <String , String > store = driver .getKeyValueStore ("store" );
302
+ final KeyValueStore <String , ValueAndTimestamp < String >> store = driver .getTimestampedKeyValueStore ("store" );
302
303
303
304
// Pre-populate the LHS records. This test is all about what happens when we add/remove RHS records
304
305
left .pipeInput ("lhs1" , "lhsValue1|rhs1" , baseTimestamp );
@@ -381,7 +382,7 @@ public void doJoinFromRightThenDeleteRightEntity(final boolean leftJoin,
381
382
}
382
383
383
384
// Now delete the RHS entity such that all matching keys have deletes propagated.
384
- right .pipeInput ("rhs1" , ( String ) null , baseTimestamp + 6 );
385
+ right .pipeInput ("rhs1" , null , baseTimestamp + 6 );
385
386
386
387
assertThat (
387
388
outputTopic .readKeyValuesToMap (),
@@ -417,7 +418,7 @@ public void shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean leftJ
417
418
try (final TopologyTestDriver driver = new TopologyTestDriver (topology , streamsConfig )) {
418
419
final TestInputTopic <String , String > left = driver .createInputTopic (LEFT_TABLE , new StringSerializer (), new StringSerializer ());
419
420
final TestOutputTopic <String , String > outputTopic = driver .createOutputTopic (OUTPUT , new StringDeserializer (), new StringDeserializer ());
420
- final KeyValueStore <String , String > store = driver .getKeyValueStore ("store" );
421
+ final KeyValueStore <String , ValueAndTimestamp < String >> store = driver .getTimestampedKeyValueStore ("store" );
421
422
422
423
left .pipeInput ("lhs1" , "lhsValue1|rhs1" , baseTimestamp );
423
424
@@ -439,7 +440,7 @@ public void shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean leftJ
439
440
// Deleting a non-joining record produces an unnecessary tombstone for inner joins, because
440
441
// it's not possible to know whether a result was previously emitted.
441
442
// For the left join, the tombstone is necessary.
442
- left .pipeInput ("lhs1" , ( String ) null , baseTimestamp + 1 );
443
+ left .pipeInput ("lhs1" , null , baseTimestamp + 1 );
443
444
{
444
445
assertThat (
445
446
outputTopic .readKeyValuesToMap (),
@@ -454,7 +455,7 @@ public void shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean leftJ
454
455
}
455
456
456
457
// Deleting a non-existing record is idempotent
457
- left .pipeInput ("lhs1" , ( String ) null , baseTimestamp + 2 );
458
+ left .pipeInput ("lhs1" , null , baseTimestamp + 2 );
458
459
{
459
460
assertThat (
460
461
outputTopic .readKeyValuesToMap (),
@@ -483,10 +484,10 @@ public void shouldNotEmitTombstonesWhenDeletingNonExistingRecords(final boolean
483
484
try (final TopologyTestDriver driver = new TopologyTestDriver (topology , streamsConfig )) {
484
485
final TestInputTopic <String , String > left = driver .createInputTopic (LEFT_TABLE , new StringSerializer (), new StringSerializer ());
485
486
final TestOutputTopic <String , String > outputTopic = driver .createOutputTopic (OUTPUT , new StringDeserializer (), new StringDeserializer ());
486
- final KeyValueStore <String , String > store = driver .getKeyValueStore ("store" );
487
+ final KeyValueStore <String , ValueAndTimestamp < String >> store = driver .getTimestampedKeyValueStore ("store" );
487
488
488
489
// Deleting a record that never existed doesn't need to emit tombstones.
489
- left .pipeInput ("lhs1" , ( String ) null , baseTimestamp );
490
+ left .pipeInput ("lhs1" , null , baseTimestamp );
490
491
{
491
492
assertThat (
492
493
outputTopic .readKeyValuesToMap (),
@@ -516,7 +517,7 @@ public void joinShouldProduceNullsWhenValueHasNonMatchingForeignKey(final boolea
516
517
final TestInputTopic <String , String > right = driver .createInputTopic (RIGHT_TABLE , new StringSerializer (), new StringSerializer ());
517
518
final TestInputTopic <String , String > left = driver .createInputTopic (LEFT_TABLE , new StringSerializer (), new StringSerializer ());
518
519
final TestOutputTopic <String , String > outputTopic = driver .createOutputTopic (OUTPUT , new StringDeserializer (), new StringDeserializer ());
519
- final KeyValueStore <String , String > store = driver .getKeyValueStore ("store" );
520
+ final KeyValueStore <String , ValueAndTimestamp < String >> store = driver .getTimestampedKeyValueStore ("store" );
520
521
521
522
left .pipeInput ("lhs1" , "lhsValue1|rhs1" , baseTimestamp );
522
523
// no output for a new inner join on a non-existent FK
@@ -623,7 +624,7 @@ public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated(final boolean left
623
624
final TestInputTopic <String , String > right = driver .createInputTopic (RIGHT_TABLE , new StringSerializer (), new StringSerializer ());
624
625
final TestInputTopic <String , String > left = driver .createInputTopic (LEFT_TABLE , new StringSerializer (), new StringSerializer ());
625
626
final TestOutputTopic <String , String > outputTopic = driver .createOutputTopic (OUTPUT , new StringDeserializer (), new StringDeserializer ());
626
- final KeyValueStore <String , String > store = driver .getKeyValueStore ("store" );
627
+ final KeyValueStore <String , ValueAndTimestamp < String >> store = driver .getTimestampedKeyValueStore ("store" );
627
628
628
629
// Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference
629
630
// then populate update on RHS
@@ -707,7 +708,7 @@ public void shouldEmitRecordOnNullForeignKeyForLeftJoins(final String optimizati
707
708
try (final TopologyTestDriver driver = new TopologyTestDriver (topology , streamsConfig )) {
708
709
final TestInputTopic <String , String > left = driver .createInputTopic (LEFT_TABLE , new StringSerializer (), new StringSerializer ());
709
710
final TestOutputTopic <String , String > outputTopic = driver .createOutputTopic (OUTPUT , new StringDeserializer (), new StringDeserializer ());
710
- final KeyValueStore <String , String > store = driver .getKeyValueStore ("store" );
711
+ final KeyValueStore <String , ValueAndTimestamp < String >> store = driver .getTimestampedKeyValueStore ("store" );
711
712
712
713
left .pipeInput ("lhs1" , "lhsValue1|rhs1" , baseTimestamp );
713
714
{
@@ -744,11 +745,11 @@ public void shouldEmitRecordWhenOldAndNewFkDiffer(final String optimization,
744
745
try (final TopologyTestDriver driver = new TopologyTestDriver (topology , streamsConfig )) {
745
746
final TestInputTopic <String , String > left = driver .createInputTopic (LEFT_TABLE , new StringSerializer (), new StringSerializer ());
746
747
final TestOutputTopic <String , String > outputTopic = driver .createOutputTopic (OUTPUT , new StringDeserializer (), new StringDeserializer ());
747
- final KeyValueStore <String , String > store = driver .getKeyValueStore ("store" );
748
+ final KeyValueStore <String , ValueAndTimestamp < String >> store = driver .getTimestampedKeyValueStore ("store" );
748
749
final String subscriptionStoreName = driver .getAllStateStores ().entrySet ().stream ()
749
750
.filter (e -> e .getKey ().contains ("SUBSCRIPTION-STATE-STORE" ))
750
751
.findAny ().orElseThrow (() -> new RuntimeException ("couldn't find store" )).getKey ();
751
- final KeyValueStore <Bytes , ValueAndTimestamp <String >> subscriptionStore = driver .getKeyValueStore (subscriptionStoreName );
752
+ final KeyValueStore <Bytes , ValueAndTimestamp <String >> subscriptionStore = driver .getTimestampedKeyValueStore (subscriptionStoreName );
752
753
final Bytes key = subscriptionStoreKey ("lhs1" , "rhs1" );
753
754
left .pipeInput ("lhs1" , "lhsValue1|rhs1" , baseTimestamp );
754
755
{
@@ -786,9 +787,11 @@ private static Bytes subscriptionStoreKey(final String lhs, final String rhs) {
786
787
return key ;
787
788
}
788
789
789
- protected static Map <String , String > asMap (final KeyValueStore <String , String > store ) {
790
+ protected static Map <String , String > asMap (final KeyValueStore <String , ValueAndTimestamp < String > > store ) {
790
791
final HashMap <String , String > result = new HashMap <>();
791
- store .all ().forEachRemaining (kv -> result .put (kv .key , kv .value ));
792
+ try (final KeyValueIterator <String , ValueAndTimestamp <String >> it = store .all ()) {
793
+ it .forEachRemaining (kv -> result .put (kv .key , kv .value .value ()));
794
+ }
792
795
return result ;
793
796
}
794
797
@@ -921,7 +924,7 @@ public void shouldIgnoreOutOfOrderRecordsIffVersioned(final boolean leftJoin,
921
924
final TestInputTopic <String , String > right = driver .createInputTopic (RIGHT_TABLE , new StringSerializer (), new StringSerializer ());
922
925
final TestInputTopic <String , String > left = driver .createInputTopic (LEFT_TABLE , new StringSerializer (), new StringSerializer ());
923
926
final TestOutputTopic <String , String > outputTopic = driver .createOutputTopic (OUTPUT , new StringDeserializer (), new StringDeserializer ());
924
- final KeyValueStore <String , String > store = driver .getKeyValueStore ("store" );
927
+ final KeyValueStore <String , ValueAndTimestamp < String >> store = driver .getTimestampedKeyValueStore ("store" );
925
928
926
929
// RHS record
927
930
right .pipeInput ("rhs1" , "rhsValue1" , baseTimestamp + 4 );
0 commit comments