Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18819 StreamsGroupHeartbeat API and StreamsGroupDescribe API check topic describe #19183

Merged
merged 3 commits into from
Mar 19, 2025

Conversation

DL1231
Copy link
Contributor

@DL1231 DL1231 commented Mar 11, 2025

This patch filters out the topic describe unauthorized topics from the StreamsGroupHeartbeat and StreamsGroupDescribe response.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker clients labels Mar 11, 2025
@DL1231
Copy link
Contributor Author

DL1231 commented Mar 12, 2025

@lucasbru PTAL when you get a chance.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! This mostly makes sense to me, I left some comments

// - CLUSTER_AUTHORIZATION_FAILED (version 0+)
// - STREAMS_INVALID_TOPOLOGY (version 0+)
// - STREAMS_INVALID_TOPOLOGY_EPOCH (version 0+)
// - STREAMS_TOPOLOGY_FENCED (version 0+)
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate

@@ -43,6 +43,7 @@
* - {@link Errors#STREAMS_INVALID_TOPOLOGY}
* - {@link Errors#STREAMS_INVALID_TOPOLOGY_EPOCH}
* - {@link Errors#STREAMS_TOPOLOGY_FENCED}
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate

log.debug("`DescribeStreamsGroups` request for group id {} failed due to error {}", groupId.idValue, error);
// The topic auth response received on DescribeStreamsGroup is a generic one not including topic names, so we just pass it on unchanged here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by this comment? Also, this block also handles GROUP_AUTHORIZATION_FAILED, so it's weird that you're just talking about topic auth.

.flatMap(subtopology => util.stream.Stream.of(
subtopology.sourceTopics,
subtopology.repartitionSinkTopics,
subtopology.repartitionSourceTopics.iterator.asScala.map(_.name).toList.asJava,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't you use .stream().map(_name) instead of converting between scala and java? It seems the extra collection is unncessary here.

Something like

Stream.concat(
                subtopology.sourceTopics.stream,
                subtopology.repartitionSinkTopics.stream,
                subtopology.repartitionSourceTopics.stream.map(_.name),
				....
)

val updatedGroups = response.groups.stream.map { group =>
val hasUnauthorizedTopic = group.topology.subtopologies.stream()
.flatMap(subtopology => util.stream.Stream.of(
subtopology.sourceTopics,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid creating intermediate collections? Either by using Stream.concat, or by just calling anyMatch on each of the collections and combining the boolean results using || for individual collections.

new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(3)
.setSubtopologies(
Collections.singletonList(new StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Collections.singletonList -> List.of?

@@ -10350,10 +10400,32 @@ class KafkaApisTest extends Logging {
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)

val subtotplogy0 = new StreamsGroupDescribeResponseData.Subtopology()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: subtotplogy -> subtopology. Same in other places

new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(group.groupId)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage("The group has described topic(s) that the client is not authorized to describe.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The described group uses topics that the client is not authorized to describe."

@lucasbru lucasbru added streams KIP-1071 PRs related to KIP-1071 labels Mar 14, 2025
@github-actions github-actions bot removed the triage PRs from the community label Mar 15, 2025
@DL1231 DL1231 requested a review from lucasbru March 18, 2025 03:12
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@lucasbru lucasbru merged commit e73719d into apache:trunk Mar 19, 2025
20 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clients core Kafka Broker KIP-1071 PRs related to KIP-1071 streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants