Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: restore testUnauthorizedTopicMetadataRequest #18578

Merged
merged 1 commit into from
Jan 19, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3561,6 +3561,115 @@ class KafkaApisTest extends Logging {
assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet)
}

@Test
def testUnauthorizedTopicMetadataRequest(): Unit = {
// 1. Set up broker information
val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val broker = new UpdateMetadataBroker()
.setId(0)
.setRack("rack")
.setEndpoints(Seq(
new UpdateMetadataEndpoint()
.setHost("broker0")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(plaintextListener.value)
).asJava)

// 2. Set up authorizer
val authorizer: Authorizer = mock(classOf[Authorizer])
val unauthorizedTopic = "unauthorized-topic"
val authorizedTopic = "authorized-topic"

val expectedActions = Seq(
new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, true, true),
new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, true, true)
)

when(authorizer.authorize(any[RequestContext], argThat((t: java.util.List[Action]) => t.containsAll(expectedActions.asJava))))
.thenAnswer { invocation =>
val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]].asScala
actions.map { action =>
if (action.resourcePattern().name().equals(authorizedTopic))
AuthorizationResult.ALLOWED
else
AuthorizationResult.DENIED
}.asJava
}

// 3. Set up MetadataCache
val authorizedTopicId = Uuid.randomUuid()
val unauthorizedTopicId = Uuid.randomUuid()

val topicIds = new util.HashMap[String, Uuid]()
topicIds.put(authorizedTopic, authorizedTopicId)
topicIds.put(unauthorizedTopic, unauthorizedTopicId)

def createDummyPartitionStates(topic: String) = {
new UpdateMetadataPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setReplicas(Collections.singletonList(0))
.setZkVersion(0)
.setIsr(Collections.singletonList(0))
}

// Send UpdateMetadataReq to update MetadataCache
val partitionStates = Seq(unauthorizedTopic, authorizedTopic).map(createDummyPartitionStates)

val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, 0, partitionStates.asJava, Seq(broker).asJava, topicIds).build()
MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have a jira to cleanup the usage of MetadataCacheTest.updateCache, as it is using updateMetadataRequest which is not used by kraft production code anymore.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is already trace by https://issues.apache.org/jira/browse/KAFKA-18540.
I will handle it ASAP.


// 4. Send TopicMetadataReq using topicId
val metadataReqByTopicId = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, unauthorizedTopicId)).build()
val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
kafkaApis.handleTopicMetadataRequest(repByTopicId)
val metadataByTopicIdResp = verifyNoThrottling[MetadataResponse](repByTopicId)

val metadataByTopicId = metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv => (kv._1, kv._2.head))

metadataByTopicId.foreach { case (topicId, metadataResponseTopic) =>
if (topicId == unauthorizedTopicId) {
// Return an TOPIC_AUTHORIZATION_FAILED on unauthorized error regardless of leaking the existence of topic id
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode())
// Do not return topic information on unauthorized error
assertNull(metadataResponseTopic.name())
} else {
assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
assertEquals(authorizedTopic, metadataResponseTopic.name())
}
}
kafkaApis.close()

// 4. Send TopicMetadataReq using topic name
reset(clientRequestQuotaManager, requestChannel)
val metadataReqByTopicName = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopic, unauthorizedTopic), false).build()
val repByTopicName = buildRequest(metadataReqByTopicName, plaintextListener)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
kafkaApis.handleTopicMetadataRequest(repByTopicName)
val metadataByTopicNameResp = verifyNoThrottling[MetadataResponse](repByTopicName)

val metadataByTopicName = metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv => (kv._1, kv._2.head))

metadataByTopicName.foreach { case (topicName, metadataResponseTopic) =>
if (topicName == unauthorizedTopic) {
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode())
// Do not return topic Id on unauthorized error
assertEquals(Uuid.ZERO_UUID, metadataResponseTopic.topicId())
} else {
assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
assertEquals(authorizedTopicId, metadataResponseTopic.topicId())
}
}
}

/**
* Verifies that sending a fetch request with version 9 works correctly when
* ReplicaManager.getLogConfig returns None.
Expand Down
Loading