@@ -3560,6 +3560,115 @@ class KafkaApisTest extends Logging {
3560
3560
assertEquals(Set (0 ), response.brokers.asScala.map(_.id).toSet)
3561
3561
}
3562
3562
3563
+ @ Test
3564
+ def testUnauthorizedTopicMetadataRequest (): Unit = {
3565
+ // 1. Set up broker information
3566
+ val plaintextListener = ListenerName .forSecurityProtocol(SecurityProtocol .PLAINTEXT )
3567
+ val broker = new UpdateMetadataBroker ()
3568
+ .setId(0 )
3569
+ .setRack(" rack" )
3570
+ .setEndpoints(Seq (
3571
+ new UpdateMetadataEndpoint ()
3572
+ .setHost(" broker0" )
3573
+ .setPort(9092 )
3574
+ .setSecurityProtocol(SecurityProtocol .PLAINTEXT .id)
3575
+ .setListener(plaintextListener.value)
3576
+ ).asJava)
3577
+
3578
+ // 2. Set up authorizer
3579
+ val authorizer : Authorizer = mock(classOf [Authorizer ])
3580
+ val unauthorizedTopic = " unauthorized-topic"
3581
+ val authorizedTopic = " authorized-topic"
3582
+
3583
+ val expectedActions = Seq (
3584
+ new Action (AclOperation .DESCRIBE , new ResourcePattern (ResourceType .TOPIC , unauthorizedTopic, PatternType .LITERAL ), 1 , true , true ),
3585
+ new Action (AclOperation .DESCRIBE , new ResourcePattern (ResourceType .TOPIC , authorizedTopic, PatternType .LITERAL ), 1 , true , true )
3586
+ )
3587
+
3588
+ when(authorizer.authorize(any[RequestContext ], argThat((t : java.util.List [Action ]) => t.containsAll(expectedActions.asJava))))
3589
+ .thenAnswer { invocation =>
3590
+ val actions = invocation.getArgument(1 ).asInstanceOf [util.List [Action ]].asScala
3591
+ actions.map { action =>
3592
+ if (action.resourcePattern().name().equals(authorizedTopic))
3593
+ AuthorizationResult .ALLOWED
3594
+ else
3595
+ AuthorizationResult .DENIED
3596
+ }.asJava
3597
+ }
3598
+
3599
+ // 3. Set up MetadataCache
3600
+ val authorizedTopicId = Uuid .randomUuid()
3601
+ val unauthorizedTopicId = Uuid .randomUuid()
3602
+
3603
+ val topicIds = new util.HashMap [String , Uuid ]()
3604
+ topicIds.put(authorizedTopic, authorizedTopicId)
3605
+ topicIds.put(unauthorizedTopic, unauthorizedTopicId)
3606
+
3607
+ def createDummyPartitionStates (topic : String ) = {
3608
+ new UpdateMetadataPartitionState ()
3609
+ .setTopicName(topic)
3610
+ .setPartitionIndex(0 )
3611
+ .setControllerEpoch(0 )
3612
+ .setLeader(0 )
3613
+ .setLeaderEpoch(0 )
3614
+ .setReplicas(Collections .singletonList(0 ))
3615
+ .setZkVersion(0 )
3616
+ .setIsr(Collections .singletonList(0 ))
3617
+ }
3618
+
3619
+ // Send UpdateMetadataReq to update MetadataCache
3620
+ val partitionStates = Seq (unauthorizedTopic, authorizedTopic).map(createDummyPartitionStates)
3621
+
3622
+ val updateMetadataRequest = new UpdateMetadataRequest .Builder (ApiKeys .UPDATE_METADATA .latestVersion, 0 ,
3623
+ 0 , 0 , partitionStates.asJava, Seq (broker).asJava, topicIds).build()
3624
+ MetadataCacheTest .updateCache(metadataCache, updateMetadataRequest)
3625
+
3626
+ // 4. Send TopicMetadataReq using topicId
3627
+ val metadataReqByTopicId = new MetadataRequest .Builder (util.Arrays .asList(authorizedTopicId, unauthorizedTopicId)).build()
3628
+ val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener)
3629
+ when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel .Request ](),
3630
+ any[Long ])).thenReturn(0 )
3631
+ kafkaApis = createKafkaApis(authorizer = Some (authorizer))
3632
+ kafkaApis.handleTopicMetadataRequest(repByTopicId)
3633
+ val metadataByTopicIdResp = verifyNoThrottling[MetadataResponse ](repByTopicId)
3634
+
3635
+ val metadataByTopicId = metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv => (kv._1, kv._2.head))
3636
+
3637
+ metadataByTopicId.foreach { case (topicId, metadataResponseTopic) =>
3638
+ if (topicId == unauthorizedTopicId) {
3639
+ // Return an TOPIC_AUTHORIZATION_FAILED on unauthorized error regardless of leaking the existence of topic id
3640
+ assertEquals(Errors .TOPIC_AUTHORIZATION_FAILED .code(), metadataResponseTopic.errorCode())
3641
+ // Do not return topic information on unauthorized error
3642
+ assertNull(metadataResponseTopic.name())
3643
+ } else {
3644
+ assertEquals(Errors .NONE .code(), metadataResponseTopic.errorCode())
3645
+ assertEquals(authorizedTopic, metadataResponseTopic.name())
3646
+ }
3647
+ }
3648
+ kafkaApis.close()
3649
+
3650
+ // 4. Send TopicMetadataReq using topic name
3651
+ reset(clientRequestQuotaManager, requestChannel)
3652
+ val metadataReqByTopicName = new MetadataRequest .Builder (util.Arrays .asList(authorizedTopic, unauthorizedTopic), false ).build()
3653
+ val repByTopicName = buildRequest(metadataReqByTopicName, plaintextListener)
3654
+ kafkaApis = createKafkaApis(authorizer = Some (authorizer))
3655
+ kafkaApis.handleTopicMetadataRequest(repByTopicName)
3656
+ val metadataByTopicNameResp = verifyNoThrottling[MetadataResponse ](repByTopicName)
3657
+
3658
+ val metadataByTopicName = metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv => (kv._1, kv._2.head))
3659
+
3660
+ metadataByTopicName.foreach { case (topicName, metadataResponseTopic) =>
3661
+ if (topicName == unauthorizedTopic) {
3662
+ assertEquals(Errors .TOPIC_AUTHORIZATION_FAILED .code(), metadataResponseTopic.errorCode())
3663
+ // Do not return topic Id on unauthorized error
3664
+ assertEquals(Uuid .ZERO_UUID , metadataResponseTopic.topicId())
3665
+ } else {
3666
+ assertEquals(Errors .NONE .code(), metadataResponseTopic.errorCode())
3667
+ assertEquals(authorizedTopicId, metadataResponseTopic.topicId())
3668
+ }
3669
+ }
3670
+ }
3671
+
3563
3672
/**
3564
3673
* Verifies that sending a fetch request with version 9 works correctly when
3565
3674
* ReplicaManager.getLogConfig returns None.
0 commit comments