Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18613: Improve test coverage for missing topics #19189

Open
wants to merge 14 commits into
base: trunk
Choose a base branch
from

Conversation

lucasbru
Copy link
Member

Tests for joining with missing source topics,
internal topics, incorrectly partitioned topics,
and stale topologies.

This is a stacked PR. Only review commits starting from "KAFKA-18613: Improve test coverage for missing topics"

@lucasbru lucasbru requested a review from cadonna March 12, 2025 14:02
@lucasbru
Copy link
Member Author

@cadonna Could you have a look? I'm adding three tests as a separate PR.

@@ -180,17 +178,13 @@ private static void enforceCopartitioning(final StreamsTopology topology,
x.repartitionSourceTopics().stream().filter(y -> y.partitions() == 0)
).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet());

if (fixedRepartitionTopics.isEmpty() && flexibleRepartitionTopics.isEmpty()) {
log.info("Skipping the repartition topic validation since there are no repartition topics.");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping here is actually not correct - we need to enforce copartitioning also for source topics.

@lucasbru lucasbru added streams KIP-1071 PRs related to KIP-1071 labels Mar 14, 2025
@lucasbru lucasbru force-pushed the kip1071merge/heartbeat_2 branch from efe995e to a7073cc Compare March 14, 2025 11:06
Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @lucasbru, overall this looks good, I left a couple of comments.

} else if (!updatedTopology.equals(group.topology().get())) {
return streamsTopologyFromRequest;
} else if (group.topology().get().topologyEpoch() > topology.epoch()) {
log.info("[GroupId {}][MemberId {}] Member joined with stake topology epoch {}", groupId, memberId, topology.epoch());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's stake topology? did you mean state

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. It was supposed to be stale.

throw new InvalidRequestException("Topology updates are not supported yet.");
} else {
log.debug("[GroupId {}][MemberId {}] Member joined with currently initialized topology {}", groupId, memberId, topology.epoch());
return group.topology().get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should any of the calls to group.topology().get() check for isPresent()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if group.topology.isEmpty is true, the control-flow will go to the first if-block, so we won't need an extra isPresent check her.e

for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) {
for (Set<String> copartitionGroup : copartitionGroups) {
decidedPartitionCountsForInternalTopics.putAll(
copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would this affect a topology where the user wants to expand partitions with a repartition operator? Would it throw an error in that case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work as in the old protocol. I think if you use a repartition operator, the repartition topics should be included in fixedRepartitionTopics.

StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put Map.of on a new line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
String groupId = "fooup";
String memberId = Uuid.randomUuid().toString();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public void testStreamsGroupMemberJoiningWithMissingInternalTopic() {
String groupId = "fooup";
String memberId = Uuid.randomUuid().toString();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra line same for similar tests below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.setWarmupTasks(List.of()));

assertEquals(
Map.of(barTopicName, new CreatableTopic().setName(barTopicName).setNumPartitions(6).setReplicationFactor((short) -1)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put builder calls on new line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Tests for joining with missing source topics,
internal topics, incorrectly partitioned topics,
and stale topologies.
@lucasbru lucasbru force-pushed the kip1071merge/heartbeat_2 branch from a7073cc to 407a15c Compare March 17, 2025 09:20
Copy link
Member Author

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck Thanks for the comments! Ready for re-review

} else if (!updatedTopology.equals(group.topology().get())) {
return streamsTopologyFromRequest;
} else if (group.topology().get().topologyEpoch() > topology.epoch()) {
log.info("[GroupId {}][MemberId {}] Member joined with stake topology epoch {}", groupId, memberId, topology.epoch());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the STALE_TOPOLOGY_EPOCH status implemented, but rejected all topology updates, which somewhat contradicts each other. I added this branch to basically ignore stale topologies.

} else if (!updatedTopology.equals(group.topology().get())) {
return streamsTopologyFromRequest;
} else if (group.topology().get().topologyEpoch() > topology.epoch()) {
log.info("[GroupId {}][MemberId {}] Member joined with stake topology epoch {}", groupId, memberId, topology.epoch());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. It was supposed to be stale.

throw new InvalidRequestException("Topology updates are not supported yet.");
} else {
log.debug("[GroupId {}][MemberId {}] Member joined with currently initialized topology {}", groupId, memberId, topology.epoch());
return group.topology().get();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if group.topology.isEmpty is true, the control-flow will go to the first if-block, so we won't need an extra isPresent check her.e

for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) {
for (Set<String> copartitionGroup : copartitionGroups) {
decidedPartitionCountsForInternalTopics.putAll(
copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work as in the old protocol. I think if you use a repartition operator, the repartition topics should be included in fixedRepartitionTopics.

public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
String groupId = "fooup";
String memberId = Uuid.randomUuid().toString();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of(
fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public void testStreamsGroupMemberJoiningWithMissingInternalTopic() {
String groupId = "fooup";
String memberId = Uuid.randomUuid().toString();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.setWarmupTasks(List.of()));

assertEquals(
Map.of(barTopicName, new CreatableTopic().setName(barTopicName).setNumPartitions(6).setReplicationFactor((short) -1)),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
KIP-1071 PRs related to KIP-1071 streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants