From bdfe7b33cd30a13bf5866e328828c3382b149aa5 Mon Sep 17 00:00:00 2001 From: JimmyWang6 Date: Fri, 24 Oct 2025 17:56:14 +0800 Subject: [PATCH 1/4] KAFKA-19817 Move DynamicTopicClusterQuotaPublisher to metadata module --- checkstyle/import-control-metadata.xml | 1 + .../main/java/kafka/server/QuotaFactory.java | 3 +- .../scala/kafka/network/RequestChannel.scala | 2 +- .../scala/kafka/server/BrokerServer.scala | 4 +- .../scala/kafka/server/ControllerServer.scala | 6 +- .../metadata/BrokerMetadataPublisher.scala | 2 +- .../DynamicTopicClusterQuotaPublisher.scala | 72 ------------------ .../server/BaseClientQuotaManagerTest.scala | 2 +- .../kafka/server/ClientQuotaManagerTest.scala | 2 +- .../kafka/server/ControllerApisTest.scala | 2 +- .../unit/kafka/server/KafkaApisTest.scala | 2 +- .../unit/kafka/server/RequestQuotaTest.scala | 2 +- .../BrokerMetadataPublisherTest.scala | 2 +- .../DynamicTopicClusterQuotaPublisher.java | 76 +++++++++++++++++++ .../publisher/QuotaManagersProvider.java | 40 ++++++++++ .../config/ClientQuotaManagerConfig.java | 0 .../apache/kafka/server}/network/Session.java | 2 +- .../server/quota/ClientQuotaManager.java | 2 +- .../kafka/server/quota/ClientSensors.java | 0 .../kafka/server/quota/ThrottleCallback.java | 0 .../kafka/server/quota/ThrottledChannel.java | 0 .../kafka/network/RequestConvertToJson.java | 1 + .../quota/ControllerMutationQuotaManager.java | 2 +- 23 files changed, 136 insertions(+), 89 deletions(-) delete mode 100644 core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/publisher/QuotaManagersProvider.java rename {server => server-common}/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java (100%) rename {server/src/main/java/org/apache/kafka => server-common/src/main/java/org/apache/kafka/server}/network/Session.java (96%) rename {server => server-common}/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java (99%) rename {server => server-common}/src/main/java/org/apache/kafka/server/quota/ClientSensors.java (100%) rename {server => server-common}/src/main/java/org/apache/kafka/server/quota/ThrottleCallback.java (100%) rename {server => server-common}/src/main/java/org/apache/kafka/server/quota/ThrottledChannel.java (100%) diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml index 773635cec8e9c..b22e59607e425 100644 --- a/checkstyle/import-control-metadata.xml +++ b/checkstyle/import-control-metadata.xml @@ -167,6 +167,7 @@ + diff --git a/core/src/main/java/kafka/server/QuotaFactory.java b/core/src/main/java/kafka/server/QuotaFactory.java index b672be4265053..6bb75469ac9d6 100644 --- a/core/src/main/java/kafka/server/QuotaFactory.java +++ b/core/src/main/java/kafka/server/QuotaFactory.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.publisher.QuotaManagersProvider; import org.apache.kafka.server.config.ClientQuotaManagerConfig; import org.apache.kafka.server.config.QuotaConfig; import org.apache.kafka.server.config.ReplicationQuotaManagerConfig; @@ -58,7 +59,7 @@ public record QuotaManagers(ClientQuotaManager fetch, ReplicationQuotaManager leader, ReplicationQuotaManager follower, ReplicationQuotaManager alterLogDirs, - Optional> clientQuotaCallbackPlugin) { + Optional> clientQuotaCallbackPlugin) implements QuotaManagersProvider { public void shutdown() { fetch.shutdown(); diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index a0fbc0452060a..ddc3f2e76c873 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -31,11 +31,11 @@ import org.apache.kafka.common.network.Send import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.Time -import org.apache.kafka.network.Session import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics} import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.network.RequestConvertToJson +import org.apache.kafka.server.network.Session import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters.RichOption diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 9404c2b216707..bdba6e703fcbd 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec import org.apache.kafka.coordinator.transaction.ProducerIdManager import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher} import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataVersionConfigValidator} -import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher} +import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, ScramPublisher} import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager} import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition} @@ -494,7 +494,7 @@ class BrokerServer( ), new DynamicTopicClusterQuotaPublisher( clusterId, - config, + config.nodeId, sharedServer.metadataPublishingFaultHandler, "broker", quotaManagers, diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 28a1e3bdfb247..d82c0e105bd48 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager import kafka.server.QuotaFactory.QuotaManagers import scala.collection.immutable -import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher} +import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher} import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.internals.Plugin import org.apache.kafka.common.message.ApiMessageType.ListenerType @@ -38,7 +38,7 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo} import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer import org.apache.kafka.metadata.bootstrap.BootstrapMetadata -import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, FeaturesPublisher, ScramPublisher} +import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, FeaturesPublisher, ScramPublisher} import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager} import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager} @@ -346,7 +346,7 @@ class ControllerServer( // Set up the DynamicTopicClusterQuotaPublisher. This will enable quotas for the cluster and topics. metadataPublishers.add(new DynamicTopicClusterQuotaPublisher( clusterId, - config, + config.nodeId, sharedServer.metadataPublishingFaultHandler, "controller", quotaManagers, diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 7364eef518817..565046296adab 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.publisher.MetadataPublisher import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} -import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher} +import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, ScramPublisher} import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion} import org.apache.kafka.server.fault.FaultHandler diff --git a/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala deleted file mode 100644 index 7798c18b4d69b..0000000000000 --- a/core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package kafka.server.metadata - -import kafka.server.KafkaConfig -import kafka.server.QuotaFactory.QuotaManagers -import kafka.utils.Logging -import org.apache.kafka.image.{MetadataDelta, MetadataImage} -import org.apache.kafka.image.loader.LoaderManifest -import org.apache.kafka.metadata.MetadataCache -import org.apache.kafka.server.fault.FaultHandler - -/** - * Publishing dynamic topic or cluster changes to the client quota manager. - * Temporary solution since Cluster objects are immutable and costly to update for every metadata change. - * See KAFKA-18239 to trace the issue. - */ -class DynamicTopicClusterQuotaPublisher ( - clusterId: String, - conf: KafkaConfig, - faultHandler: FaultHandler, - nodeType: String, - quotaManagers: QuotaManagers -) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { - logIdent = s"[${name()}] " - - override def name(): String = s"DynamicTopicClusterQuotaPublisher $nodeType id=${conf.nodeId}" - - override def onMetadataUpdate( - delta: MetadataDelta, - newImage: MetadataImage, - manifest: LoaderManifest - ): Unit = { - onMetadataUpdate(delta, newImage) - } - - def onMetadataUpdate( - delta: MetadataDelta, - newImage: MetadataImage, - ): Unit = { - try { - quotaManagers.clientQuotaCallbackPlugin().ifPresent(plugin => { - if (delta.topicsDelta() != null || delta.clusterDelta() != null) { - val cluster = MetadataCache.toCluster(clusterId, newImage) - if (plugin.get().updateClusterMetadata(cluster)) { - quotaManagers.fetch.updateQuotaMetricConfigs() - quotaManagers.produce.updateQuotaMetricConfigs() - quotaManagers.request.updateQuotaMetricConfigs() - quotaManagers.controllerMutation.updateQuotaMetricConfigs() - } - } - }) - } catch { - case t: Throwable => - val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" - faultHandler.handleFault("Uncaught exception while " + - s"publishing dynamic topic or cluster changes from $deltaName", t) - } - } -} - \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala index e25733a6f0dc4..3861901f658c3 100644 --- a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala @@ -29,8 +29,8 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.utils.MockTime -import org.apache.kafka.network.Session import org.apache.kafka.network.metrics.RequestChannelMetrics +import org.apache.kafka.server.network.Session import org.apache.kafka.server.quota.{ClientQuotaManager, ThrottleCallback} import org.junit.jupiter.api.AfterEach import org.mockito.Mockito.mock diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index c166eef801221..c6afa7a5e34ff 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -22,7 +22,7 @@ import org.apache.kafka.common.internals.Plugin import org.apache.kafka.common.metrics.Quota import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.server.config.ClientQuotaManagerConfig -import org.apache.kafka.network.Session +import org.apache.kafka.server.network.Session import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaManager, ClientQuotaType, QuotaType} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 43c7d5aecf464..0ca118d59ba8b 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -54,12 +54,12 @@ import org.apache.kafka.controller.{Controller, ControllerRequestContext, Result import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.network.metrics.RequestChannelMetrics -import org.apache.kafka.network.Session import org.apache.kafka.raft.{QuorumConfig, RaftManager} import org.apache.kafka.server.SimpleApiVersionManager import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer} import org.apache.kafka.server.common.{ApiMessageAndVersion, FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock, RequestLocal} import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs} +import org.apache.kafka.server.network.Session import org.apache.kafka.server.quota.{ClientQuotaManager, ControllerMutationQuota, ControllerMutationQuotaManager} import org.apache.kafka.server.util.FutureUtils import org.apache.kafka.storage.internals.log.CleanerConfig diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 3d801536072cc..fd78cbeb75a77 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -83,7 +83,6 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTes import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance} import org.apache.kafka.metadata.{ConfigRepository, MetadataCache, MockConfigRepository} -import org.apache.kafka.network.Session import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics} import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.authorizer.AclEntry @@ -93,6 +92,7 @@ import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupV import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.logger.LoggingController import org.apache.kafka.server.metrics.ClientMetricsTestUtils +import org.apache.kafka.server.network.Session import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, SharePartitionKey} import org.apache.kafka.server.quota.{ClientQuotaManager, ControllerMutationQuota, ControllerMutationQuotaManager, ThrottleCallback} import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index e188889556b7c..f6e32a52e2d23 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -38,9 +38,9 @@ import org.apache.kafka.common.security.auth._ import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.metadata.authorizer.StandardAuthorizer -import org.apache.kafka.network.Session import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs} +import org.apache.kafka.server.network.Session import org.apache.kafka.server.quota.QuotaType import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 36a94edf9bd55..adae5e829d64c 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.coordinator.share.ShareCoordinator import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage} import org.apache.kafka.image.loader.LogDeltaManifest -import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher} +import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, ScramPublisher} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.LeaderAndEpoch import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java new file mode 100644 index 0000000000000..fa9c8340b4662 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata.publisher; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.image.publisher.MetadataPublisher; +import org.apache.kafka.metadata.MetadataCache; +import org.apache.kafka.server.fault.FaultHandler; + +public class DynamicTopicClusterQuotaPublisher implements MetadataPublisher { + private final String clusterId; + private final Integer nodeId; + private final FaultHandler faultHandler; + private final String nodeType; + private final QuotaManagersProvider quotaManagersProvider; + + public DynamicTopicClusterQuotaPublisher( + String clusterId, + Integer nodeId, + FaultHandler faultHandler, + String nodeType, + QuotaManagersProvider quotaManagers + ) { + this.clusterId = clusterId; + this.nodeId = nodeId; + this.faultHandler = faultHandler; + this.nodeType = nodeType; + this.quotaManagersProvider = quotaManagers; + } + + @Override + public String name() { + return "DynamicTopicClusterQuotaPublisher " + nodeType + " id=" + nodeId; + } + + @Override + public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) { + onMetadataUpdate(delta, newImage); + } + + public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) { + try { + quotaManagersProvider.clientQuotaCallbackPlugin().ifPresent(plugin -> { + if (delta.topicsDelta() != null || delta.clusterDelta() != null) { + Cluster cluster = MetadataCache.toCluster(clusterId, newImage); + if (plugin.get().updateClusterMetadata(cluster)) { + quotaManagersProvider.fetch().updateQuotaMetricConfigs(); + quotaManagersProvider.produce().updateQuotaMetricConfigs(); + quotaManagersProvider.request().updateQuotaMetricConfigs(); + quotaManagersProvider.controllerMutation().updateQuotaMetricConfigs(); + } + } + }); + } catch (Exception e) { + String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset(); + faultHandler.handleFault("Uncaught exception while publishing dynamic topic or cluster changes from " + deltaName, e); + } + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/QuotaManagersProvider.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/QuotaManagersProvider.java new file mode 100644 index 0000000000000..e565288c4682b --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/QuotaManagersProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata.publisher; + +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaManager; + +import java.util.Optional; + +/** + * Interface to provide access to quota managers for metadata publishers. + * This abstraction allows the metadata module to access quota managers without + * directly depending on the core module. + */ +public interface QuotaManagersProvider { + ClientQuotaManager fetch(); + + ClientQuotaManager produce(); + + ClientQuotaManager request(); + + ClientQuotaManager controllerMutation(); + + Optional> clientQuotaCallbackPlugin(); +} diff --git a/server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java b/server-common/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java similarity index 100% rename from server/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java rename to server-common/src/main/java/org/apache/kafka/server/config/ClientQuotaManagerConfig.java diff --git a/server/src/main/java/org/apache/kafka/network/Session.java b/server-common/src/main/java/org/apache/kafka/server/network/Session.java similarity index 96% rename from server/src/main/java/org/apache/kafka/network/Session.java rename to server-common/src/main/java/org/apache/kafka/server/network/Session.java index 1793e4d565c25..0c1bdc4e637ac 100644 --- a/server/src/main/java/org/apache/kafka/network/Session.java +++ b/server-common/src/main/java/org/apache/kafka/server/network/Session.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.network; +package org.apache.kafka.server.network; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.utils.Sanitizer; diff --git a/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java b/server-common/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java similarity index 99% rename from server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java rename to server-common/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java index c91f6bd299a0d..e6136d526c837 100644 --- a/server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java @@ -30,8 +30,8 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.utils.Sanitizer; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.network.Session; import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.network.Session; import org.apache.kafka.server.util.ShutdownableThread; import org.slf4j.Logger; diff --git a/server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java b/server-common/src/main/java/org/apache/kafka/server/quota/ClientSensors.java similarity index 100% rename from server/src/main/java/org/apache/kafka/server/quota/ClientSensors.java rename to server-common/src/main/java/org/apache/kafka/server/quota/ClientSensors.java diff --git a/server/src/main/java/org/apache/kafka/server/quota/ThrottleCallback.java b/server-common/src/main/java/org/apache/kafka/server/quota/ThrottleCallback.java similarity index 100% rename from server/src/main/java/org/apache/kafka/server/quota/ThrottleCallback.java rename to server-common/src/main/java/org/apache/kafka/server/quota/ThrottleCallback.java diff --git a/server/src/main/java/org/apache/kafka/server/quota/ThrottledChannel.java b/server-common/src/main/java/org/apache/kafka/server/quota/ThrottledChannel.java similarity index 100% rename from server/src/main/java/org/apache/kafka/server/quota/ThrottledChannel.java rename to server-common/src/main/java/org/apache/kafka/server/quota/ThrottledChannel.java diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java index 9af49f27d66cb..6838d8dd97d7b 100644 --- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java +++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java @@ -378,6 +378,7 @@ import org.apache.kafka.common.requests.WriteShareGroupStateResponse; import org.apache.kafka.common.requests.WriteTxnMarkersRequest; import org.apache.kafka.common.requests.WriteTxnMarkersResponse; +import org.apache.kafka.server.network.Session; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.BooleanNode; diff --git a/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java b/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java index b7c8665480a12..c5ab7eca9a9e2 100644 --- a/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java +++ b/server/src/main/java/org/apache/kafka/server/quota/ControllerMutationQuotaManager.java @@ -25,8 +25,8 @@ import org.apache.kafka.common.metrics.stats.TokenBucket; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.network.Session; import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.network.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 67485076d0f20572730aef19a48b4c297057f3fb Mon Sep 17 00:00:00 2001 From: jimmy <961370183@qq.com> Date: Thu, 30 Oct 2025 02:07:44 +0800 Subject: [PATCH 2/4] fix build error --- checkstyle/import-control-server-common.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml index 95a014b87e4ec..0a80f43b17e71 100644 --- a/checkstyle/import-control-server-common.xml +++ b/checkstyle/import-control-server-common.xml @@ -105,6 +105,11 @@ + + + + + From e57e76e53b8afd6ac8d565ccc8c992b797e39662 Mon Sep 17 00:00:00 2001 From: JimmyWang6 Date: Fri, 31 Oct 2025 11:41:05 +0800 Subject: [PATCH 3/4] address comments --- .../metadata/BrokerMetadataPublisher.scala | 2 +- .../network/RequestConvertToJsonTest.scala | 2 +- .../unit/kafka/network/SocketServerTest.scala | 2 +- .../unit/kafka/server/KafkaApisTest.scala | 18 +++++++++--------- .../DynamicTopicClusterQuotaPublisher.java | 8 ++------ .../apache/kafka/server/network/Session.java | 12 +++--------- .../kafka/server/quota/ClientQuotaManager.java | 4 ++-- .../kafka/network/RequestConvertToJson.java | 2 +- 8 files changed, 20 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 565046296adab..0951d6d202eca 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -221,7 +221,7 @@ class BrokerMetadataPublisher( dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage, manifest) // Apply topic or cluster quotas delta. - dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage) + dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage, manifest) // Apply SCRAM delta. scramPublisher.onMetadataUpdate(delta, newImage, manifest) diff --git a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala index 400e13dc6bef7..f15c34fc72a66 100644 --- a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala +++ b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala @@ -95,7 +95,7 @@ class RequestConvertToJsonTest { expectedNode.set("responseQueueTimeMs", new DoubleNode(responseQueueTimeMs)) expectedNode.set("sendTimeMs", new DoubleNode(responseSendTimeMs)) expectedNode.set("securityProtocol", new TextNode(req.context.securityProtocol.toString)) - expectedNode.set("principal", new TextNode(req.session.principal.toString)) + expectedNode.set("principal", new TextNode(req.session.principal().toString)) expectedNode.set("listener", new TextNode(req.context.listenerName.value)) expectedNode.set("clientInformation", RequestConvertToJson.clientInfoNode(req.context.clientInformation)) expectedNode.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes)) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 6a4b8d8ca672e..875a96e7593da 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -1068,7 +1068,7 @@ class SocketServerTest { val socket = connect() val bytes = new Array[Byte](40) sendRequest(socket, bytes, Some(0)) - assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.dataPlaneRequestChannel).session.principal) + assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.dataPlaneRequestChannel).session.principal()) } /* Test that we update request metrics if the client closes the connection while the broker response is in flight. */ diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index fd78cbeb75a77..1b224389cf142 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -2517,10 +2517,10 @@ class KafkaApisTest extends Logging { // Verify the Session data passed to fetch quota manager is exactly what was defined in the test val capturedSession = sessionCaptorFetch.getValue assertNotNull(capturedSession) - assertNotNull(capturedSession.principal) - assertEquals(KafkaPrincipal.USER_TYPE, capturedSession.principal.getPrincipalType) - assertEquals("test-user", capturedSession.principal.getName) - assertEquals(testClientAddress, capturedSession.clientAddress) + assertNotNull(capturedSession.principal()) + assertEquals(KafkaPrincipal.USER_TYPE, capturedSession.principal().getPrincipalType) + assertEquals("test-user", capturedSession.principal().getName) + assertEquals(testClientAddress, capturedSession.clientAddress()) assertEquals("test-user", capturedSession.sanitizedUser) // Verify client ID passed to fetch quota manager matches what was defined @@ -2733,11 +2733,11 @@ class KafkaApisTest extends Logging { // Verify the Session data passed to fetch quota manager is exactly what was defined in the test val capturedSession = sessionCaptorFetch.getValue assertNotNull(capturedSession) - assertNotNull(capturedSession.principal) - assertEquals(KafkaPrincipal.USER_TYPE, capturedSession.principal.getPrincipalType) - assertEquals("test-user", capturedSession.principal.getName) - assertEquals(testClientAddress, capturedSession.clientAddress) - assertEquals("test-user", capturedSession.sanitizedUser) + assertNotNull(capturedSession.principal()) + assertEquals(KafkaPrincipal.USER_TYPE, capturedSession.principal().getPrincipalType) + assertEquals("test-user", capturedSession.principal().getName) + assertEquals(testClientAddress, capturedSession.clientAddress()) + assertEquals("test-user", capturedSession.sanitizedUser()) // Verify client ID passed to fetch quota manager matches what was defined val capturedClientId = clientIdCaptor.getValue diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java index fa9c8340b4662..18331a240b8c1 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DynamicTopicClusterQuotaPublisher.java @@ -26,14 +26,14 @@ public class DynamicTopicClusterQuotaPublisher implements MetadataPublisher { private final String clusterId; - private final Integer nodeId; + private final int nodeId; private final FaultHandler faultHandler; private final String nodeType; private final QuotaManagersProvider quotaManagersProvider; public DynamicTopicClusterQuotaPublisher( String clusterId, - Integer nodeId, + int nodeId, FaultHandler faultHandler, String nodeType, QuotaManagersProvider quotaManagers @@ -52,10 +52,6 @@ public String name() { @Override public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) { - onMetadataUpdate(delta, newImage); - } - - public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) { try { quotaManagersProvider.clientQuotaCallbackPlugin().ifPresent(plugin -> { if (delta.topicsDelta() != null || delta.clusterDelta() != null) { diff --git a/server-common/src/main/java/org/apache/kafka/server/network/Session.java b/server-common/src/main/java/org/apache/kafka/server/network/Session.java index 0c1bdc4e637ac..52988dcb222dd 100644 --- a/server-common/src/main/java/org/apache/kafka/server/network/Session.java +++ b/server-common/src/main/java/org/apache/kafka/server/network/Session.java @@ -21,14 +21,8 @@ import java.net.InetAddress; -public class Session { - public final KafkaPrincipal principal; - public final InetAddress clientAddress; - public final String sanitizedUser; - - public Session(KafkaPrincipal principal, InetAddress clientAddress) { - this.principal = principal; - this.clientAddress = clientAddress; - this.sanitizedUser = Sanitizer.sanitize(principal.getName()); +public record Session(KafkaPrincipal principal, InetAddress clientAddress) { + public String sanitizedUser() { + return Sanitizer.sanitize(principal.getName()); } } diff --git a/server-common/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java b/server-common/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java index e6136d526c837..7121d29725879 100644 --- a/server-common/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java @@ -446,8 +446,8 @@ protected long throttleTime(QuotaViolationException e, long timeMs) { */ public ClientSensors getOrCreateQuotaSensors(Session session, String clientId) { var metricTags = quotaCallback instanceof DefaultQuotaCallback defaultCallback - ? defaultCallback.quotaMetricTags(session.sanitizedUser, clientId) - : quotaCallback.quotaMetricTags(clientQuotaType, session.principal, clientId); + ? defaultCallback.quotaMetricTags(session.sanitizedUser(), clientId) + : quotaCallback.quotaMetricTags(clientQuotaType, session.principal(), clientId); var sensors = new ClientSensors( metricTags, sensorAccessor.getOrCreate( diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java index 6838d8dd97d7b..94040dab5de1f 100644 --- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java +++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java @@ -795,7 +795,7 @@ public static JsonNode requestDescMetrics(RequestHeader header, Optional 0) { From 624ddd6627b2ee82900c47976eb3dbc6da703400 Mon Sep 17 00:00:00 2001 From: JimmyWang6 Date: Fri, 31 Oct 2025 11:48:54 +0800 Subject: [PATCH 4/4] fix nit --- .../src/main/java/org/apache/kafka/server/network/Session.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/network/Session.java b/server-common/src/main/java/org/apache/kafka/server/network/Session.java index 52988dcb222dd..2424fdc982f7d 100644 --- a/server-common/src/main/java/org/apache/kafka/server/network/Session.java +++ b/server-common/src/main/java/org/apache/kafka/server/network/Session.java @@ -25,4 +25,4 @@ public record Session(KafkaPrincipal principal, InetAddress clientAddress) { public String sanitizedUser() { return Sanitizer.sanitize(principal.getName()); } -} +} \ No newline at end of file