Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: avoid WARN logs in KafkaStreams test #18517

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
Expand Down Expand Up @@ -174,7 +175,7 @@ public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin,
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final TestOutputTopic<String, String> rejoinOutputTopic = rejoin ? driver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new StringDeserializer()) : null;
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store");

// Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records
right.pipeInput("rhs1", "rhsValue1", baseTimestamp);
Expand Down Expand Up @@ -257,7 +258,7 @@ public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin,
}
// Now delete one LHS entity such that one delete is propagated down to the output.

left.pipeInput("lhs1", (String) null, baseTimestamp + 6);
left.pipeInput("lhs1", null, baseTimestamp + 6);
assertThat(
outputTopic.readKeyValuesToMap(),
is(mkMap(
Expand Down Expand Up @@ -298,7 +299,7 @@ public void doJoinFromRightThenDeleteRightEntity(final boolean leftJoin,
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store");

// Pre-populate the LHS records. This test is all about what happens when we add/remove RHS records
left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
Expand Down Expand Up @@ -381,7 +382,7 @@ public void doJoinFromRightThenDeleteRightEntity(final boolean leftJoin,
}

// Now delete the RHS entity such that all matching keys have deletes propagated.
right.pipeInput("rhs1", (String) null, baseTimestamp + 6);
right.pipeInput("rhs1", null, baseTimestamp + 6);

assertThat(
outputTopic.readKeyValuesToMap(),
Expand Down Expand Up @@ -417,7 +418,7 @@ public void shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean leftJ
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store");

left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);

Expand All @@ -439,7 +440,7 @@ public void shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean leftJ
// Deleting a non-joining record produces an unnecessary tombstone for inner joins, because
// it's not possible to know whether a result was previously emitted.
// For the left join, the tombstone is necessary.
left.pipeInput("lhs1", (String) null, baseTimestamp + 1);
left.pipeInput("lhs1", null, baseTimestamp + 1);
{
assertThat(
outputTopic.readKeyValuesToMap(),
Expand All @@ -454,7 +455,7 @@ public void shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean leftJ
}

// Deleting a non-existing record is idempotent
left.pipeInput("lhs1", (String) null, baseTimestamp + 2);
left.pipeInput("lhs1", null, baseTimestamp + 2);
{
assertThat(
outputTopic.readKeyValuesToMap(),
Expand Down Expand Up @@ -483,10 +484,10 @@ public void shouldNotEmitTombstonesWhenDeletingNonExistingRecords(final boolean
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store");

// Deleting a record that never existed doesn't need to emit tombstones.
left.pipeInput("lhs1", (String) null, baseTimestamp);
left.pipeInput("lhs1", null, baseTimestamp);
{
assertThat(
outputTopic.readKeyValuesToMap(),
Expand Down Expand Up @@ -516,7 +517,7 @@ public void joinShouldProduceNullsWhenValueHasNonMatchingForeignKey(final boolea
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store");

left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
// no output for a new inner join on a non-existent FK
Expand Down Expand Up @@ -623,7 +624,7 @@ public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated(final boolean left
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store");

// Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference
// then populate update on RHS
Expand Down Expand Up @@ -707,7 +708,7 @@ public void shouldEmitRecordOnNullForeignKeyForLeftJoins(final String optimizati
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store");

left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
{
Expand Down Expand Up @@ -744,11 +745,11 @@ public void shouldEmitRecordWhenOldAndNewFkDiffer(final String optimization,
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store");
final String subscriptionStoreName = driver.getAllStateStores().entrySet().stream()
.filter(e -> e.getKey().contains("SUBSCRIPTION-STATE-STORE"))
.findAny().orElseThrow(() -> new RuntimeException("couldn't find store")).getKey();
final KeyValueStore<Bytes, ValueAndTimestamp<String>> subscriptionStore = driver.getKeyValueStore(subscriptionStoreName);
final KeyValueStore<Bytes, ValueAndTimestamp<String>> subscriptionStore = driver.getTimestampedKeyValueStore(subscriptionStoreName);
final Bytes key = subscriptionStoreKey("lhs1", "rhs1");
left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
{
Expand Down Expand Up @@ -786,9 +787,11 @@ private static Bytes subscriptionStoreKey(final String lhs, final String rhs) {
return key;
}

protected static Map<String, String> asMap(final KeyValueStore<String, String> store) {
protected static Map<String, String> asMap(final KeyValueStore<String, ValueAndTimestamp<String>> store) {
final HashMap<String, String> result = new HashMap<>();
store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));
try (final KeyValueIterator<String, ValueAndTimestamp<String>> it = store.all()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional side cleanup.

it.forEachRemaining(kv -> result.put(kv.key, kv.value.value()));
}
return result;
}

Expand Down Expand Up @@ -921,7 +924,7 @@ public void shouldIgnoreOutOfOrderRecordsIffVersioned(final boolean leftJoin,
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store");

// RHS record
right.pipeInput("rhs1", "rhsValue1", baseTimestamp + 4);
Expand Down
Loading