diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 277a60cf5d0d4..3d6fe855001ef 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3561,6 +3561,115 @@ class KafkaApisTest extends Logging {
     assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet)
   }
 
+  @Test
+  def testUnauthorizedTopicMetadataRequest(): Unit = {
+    // 1. Set up broker information
+    val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    val broker = new UpdateMetadataBroker()
+      .setId(0)
+      .setRack("rack")
+      .setEndpoints(Seq(
+        new UpdateMetadataEndpoint()
+          .setHost("broker0")
+          .setPort(9092)
+          .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+          .setListener(plaintextListener.value)
+      ).asJava)
+
+    // 2. Set up authorizer
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val unauthorizedTopic = "unauthorized-topic"
+    val authorizedTopic = "authorized-topic"
+
+    val expectedActions = Seq(
+      new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, true, true),
+      new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, true, true)
+    )
+
+    when(authorizer.authorize(any[RequestContext], argThat((t: java.util.List[Action]) => t.containsAll(expectedActions.asJava))))
+      .thenAnswer { invocation =>
+        val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]].asScala
+        actions.map { action =>
+          if (action.resourcePattern().name().equals(authorizedTopic))
+            AuthorizationResult.ALLOWED
+          else
+            AuthorizationResult.DENIED
+        }.asJava
+      }
+
+    // 3. Set up MetadataCache
+    val authorizedTopicId = Uuid.randomUuid()
+    val unauthorizedTopicId = Uuid.randomUuid()
+
+    val topicIds = new util.HashMap[String, Uuid]()
+    topicIds.put(authorizedTopic, authorizedTopicId)
+    topicIds.put(unauthorizedTopic, unauthorizedTopicId)
+
+    def createDummyPartitionStates(topic: String) = {
+      new UpdateMetadataPartitionState()
+        .setTopicName(topic)
+        .setPartitionIndex(0)
+        .setControllerEpoch(0)
+        .setLeader(0)
+        .setLeaderEpoch(0)
+        .setReplicas(Collections.singletonList(0))
+        .setZkVersion(0)
+        .setIsr(Collections.singletonList(0))
+    }
+
+    // Send UpdateMetadataReq to update MetadataCache
+    val partitionStates = Seq(unauthorizedTopic, authorizedTopic).map(createDummyPartitionStates)
+
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
+      0, 0, partitionStates.asJava, Seq(broker).asJava, topicIds).build()
+    MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest)
+
+    // 4. Send TopicMetadataReq using topicId
+    val metadataReqByTopicId = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, unauthorizedTopicId)).build()
+    val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener)
+    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    kafkaApis.handleTopicMetadataRequest(repByTopicId)
+    val metadataByTopicIdResp = verifyNoThrottling[MetadataResponse](repByTopicId)
+
+    val metadataByTopicId = metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv => (kv._1, kv._2.head))
+
+    metadataByTopicId.foreach { case (topicId, metadataResponseTopic) =>
+      if (topicId == unauthorizedTopicId) {
+        // Return an TOPIC_AUTHORIZATION_FAILED on unauthorized error regardless of leaking the existence of topic id
+        assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode())
+        // Do not return topic information on unauthorized error
+        assertNull(metadataResponseTopic.name())
+      } else {
+        assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
+        assertEquals(authorizedTopic, metadataResponseTopic.name())
+      }
+    }
+    kafkaApis.close()
+
+    // 4. Send TopicMetadataReq using topic name
+    reset(clientRequestQuotaManager, requestChannel)
+    val metadataReqByTopicName = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopic, unauthorizedTopic), false).build()
+    val repByTopicName = buildRequest(metadataReqByTopicName, plaintextListener)
+    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    kafkaApis.handleTopicMetadataRequest(repByTopicName)
+    val metadataByTopicNameResp = verifyNoThrottling[MetadataResponse](repByTopicName)
+
+    val metadataByTopicName = metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv => (kv._1, kv._2.head))
+
+    metadataByTopicName.foreach { case (topicName, metadataResponseTopic) =>
+      if (topicName == unauthorizedTopic) {
+        assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode())
+        // Do not return topic Id on unauthorized error
+        assertEquals(Uuid.ZERO_UUID, metadataResponseTopic.topicId())
+      } else {
+        assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
+        assertEquals(authorizedTopicId, metadataResponseTopic.topicId())
+      }
+    }
+  }
+
     /**
    * Verifies that sending a fetch request with version 9 works correctly when
    * ReplicaManager.getLogConfig returns None.