Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18613: Improve test coverage for missing topics #19189

Merged
merged 2 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2273,25 +2273,27 @@ private StreamsTopology maybeUpdateTopology(final String groupId,
final Topology topology,
final StreamsGroup group,
final List<CoordinatorRecord> records) {
StreamsTopology updatedTopology;
if (topology != null) {
StreamsGroupTopologyValue recordValue = convertToStreamsGroupTopologyRecord(topology);

updatedTopology = StreamsTopology.fromHeartbeatRequest(topology);

StreamsTopology streamsTopologyFromRequest = StreamsTopology.fromHeartbeatRequest(topology);
if (group.topology().isEmpty()) {
log.info("[GroupId {}][MemberId {}] Member initialized the topology with epoch {}", groupId, memberId, topology.epoch());

StreamsGroupTopologyValue recordValue = convertToStreamsGroupTopologyRecord(topology);
records.add(newStreamsGroupTopologyRecord(groupId, recordValue));
} else if (!updatedTopology.equals(group.topology().get())) {
return streamsTopologyFromRequest;
} else if (group.topology().get().topologyEpoch() > topology.epoch()) {
log.info("[GroupId {}][MemberId {}] Member joined with stale topology epoch {}", groupId, memberId, topology.epoch());
return group.topology().get();
} else if (!group.topology().get().equals(streamsTopologyFromRequest)) {
throw new InvalidRequestException("Topology updates are not supported yet.");
} else {
log.debug("[GroupId {}][MemberId {}] Member joined with currently initialized topology {}", groupId, memberId, topology.epoch());
return group.topology().get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should any of the calls to group.topology().get() check for isPresent()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if group.topology.isEmpty is true, the control-flow will go to the first if-block, so we won't need an extra isPresent check her.e

}
} else if (group.topology().isPresent()) {
updatedTopology = group.topology().get();
return group.topology().get();
} else {
throw new IllegalStateException("The topology is null and the group topology is also null.");
}
return updatedTopology;
}

private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGroupHeartbeatResponseTaskIds(final Map<String, Set<Integer>> taskIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ private static Map<String, Integer> decidePartitionCounts(final LogContext logCo
enforceCopartitioning(
topology,
copartitionGroupsBySubtopology,
log,
decidedPartitionCountsForInternalTopics,
copartitionedTopicsEnforcer
);
Expand All @@ -168,7 +167,6 @@ private static Map<String, Integer> decidePartitionCounts(final LogContext logCo

private static void enforceCopartitioning(final StreamsTopology topology,
final Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology,
final Logger log,
final Map<String, Integer> decidedPartitionCountsForInternalTopics,
final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer) {
final Set<String> fixedRepartitionTopics =
Expand All @@ -180,17 +178,13 @@ private static void enforceCopartitioning(final StreamsTopology topology,
x.repartitionSourceTopics().stream().filter(y -> y.partitions() == 0)
).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet());

if (fixedRepartitionTopics.isEmpty() && flexibleRepartitionTopics.isEmpty()) {
log.info("Skipping the repartition topic validation since there are no repartition topics.");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping here is actually not correct - we need to enforce copartitioning also for source topics.

} else {
// ensure the co-partitioning topics within the group have the same number of partitions,
// and enforce the number of partitions for those repartition topics to be the same if they
// are co-partitioned as well.
for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) {
for (Set<String> copartitionGroup : copartitionGroups) {
decidedPartitionCountsForInternalTopics.putAll(
copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics));
}
// ensure the co-partitioning topics within the group have the same number of partitions,
// and enforce the number of partitions for those repartition topics to be the same if they
// are co-partitioned as well.
for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) {
for (Set<String> copartitionGroup : copartitionGroups) {
decidedPartitionCountsForInternalTopics.putAll(
copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would this affect a topology where the user wants to expand partitions with a repartition operator? Would it throw an error in that case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work as in the old protocol. I think if you use a repartition operator, the repartition topics should be included in fixedRepartitionTopics.

}
}
}
Expand Down
Loading