From 756fb83b398040963a2e37b0e195428904929a59 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 13 Jan 2025 23:30:27 +0800 Subject: [PATCH 1/3] KAFKA-18405: Remove ZooKeeper logic from DynamicBrokerConfig Signed-off-by: PoAn Yang --- .../scala/kafka/server/BrokerServer.scala | 2 +- .../scala/kafka/server/ControllerServer.scala | 2 +- .../kafka/server/DynamicBrokerConfig.scala | 23 +-------- .../scala/kafka/server/SharedServer.scala | 2 +- .../metadata/BrokerMetadataPublisher.scala | 6 +-- .../server/DynamicBrokerConfigTest.scala | 48 +++++++++---------- .../kafka/server/ReplicaManagerTest.scala | 4 +- 7 files changed, 32 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 43160315b6d5f..f54eda0aad6dd 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -193,7 +193,7 @@ class BrokerServer( info("Starting broker") val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin() - config.dynamicConfig.initialize(zkClientOpt = None, Some(clientMetricsReceiverPlugin)) + config.dynamicConfig.initialize(Some(clientMetricsReceiverPlugin)) /* start scheduler */ kafkaScheduler = new KafkaScheduler(config.backgroundThreads) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index c9947081b2de5..76ffffe53b38d 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -124,7 +124,7 @@ class ControllerServer( try { this.logIdent = logContext.logPrefix() info("Starting controller") - config.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None) + config.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None) maybeChangeStatus(STARTING, STARTED) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index ea99be3bcb15a..73a0694461a86 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -26,7 +26,6 @@ import kafka.log.{LogCleaner, LogManager} import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.server.DynamicBrokerConfig._ import kafka.utils.{CoreUtils, Logging} -import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.common.Reconfigurable import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs} @@ -39,7 +38,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.security.PasswordEncoder import org.apache.kafka.server.ProcessRole -import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals} +import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs} import org.apache.kafka.server.telemetry.ClientTelemetry @@ -58,8 +57,6 @@ import scala.jdk.CollectionConverters._ * * The order of precedence for broker configs is: *
    - *
  1. DYNAMIC_BROKER_CONFIG: stored in ZK at /configs/brokers/{brokerId}
  2. - *
  3. DYNAMIC_DEFAULT_BROKER_CONFIG: stored in ZK at /configs/brokers/<default>
  4. *
  5. STATIC_BROKER_CONFIG: properties that broker is started up with, typically from server.properties file
  6. *
  7. DEFAULT_CONFIG: Default configs defined in KafkaConfig
  8. *
@@ -215,17 +212,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private var currentConfig: KafkaConfig = _ private val dynamicConfigPasswordEncoder = Some(PasswordEncoder.NOOP) - private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = { + private[server] def initialize(clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = { currentConfig = new KafkaConfig(kafkaConfig.props, false) metricsReceiverPluginOpt = clientMetricsReceiverPluginOpt - - zkClientOpt.foreach { zkClient => - val adminZkClient = new AdminZkClient(zkClient) - updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.BROKER, ZooKeeperInternals.DEFAULT_STRING), false) - val props = adminZkClient.fetchEntityConfig(ConfigType.BROKER, kafkaConfig.brokerId.toString) - val brokerConfig = maybeReEncodePasswords(props, adminZkClient) - updateBrokerConfig(kafkaConfig.brokerId, brokerConfig) - } } /** @@ -427,14 +416,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging props } - // If the secret has changed, password.encoder.old.secret contains the old secret that was used - // to encode the configs in ZK. Decode passwords using the old secret and update ZK with values - // encoded using the current secret. Ignore any errors during decoding since old secret may not - // have been removed during broker restart. - private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = { - persistentProps.clone().asInstanceOf[Properties] - } - /** * Validate the provided configs `propsOverride` and return the full Kafka configs with * the configured defaults and these overrides. diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 45c9b10b0262f..66af33c169793 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -268,7 +268,7 @@ class SharedServer( // This is only done in tests. metrics = new Metrics() } - sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None) + sharedServerConfig.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None) if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) { brokerMetrics = new BrokerServerMetrics(metrics) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 957677c2b4181..cc8d16b2e7c8e 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -17,7 +17,7 @@ package kafka.server.metadata -import java.util.{OptionalInt, Properties} +import java.util.OptionalInt import kafka.coordinator.transaction.TransactionCoordinator import kafka.log.LogManager import kafka.server.{KafkaConfig, ReplicaManager} @@ -243,10 +243,6 @@ class BrokerMetadataPublisher( } } - def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = { - config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props) - } - /** * Update the coordinator of local replica changes: election and resignation. * diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 09555351c6799..2a6c033ccf70b 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -57,7 +57,7 @@ class DynamicBrokerConfigTest { props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, oldKeystore) val config = KafkaConfig(props) val dynamicConfig = config.dynamicConfig - dynamicConfig.initialize(None, None) + dynamicConfig.initialize(None) assertEquals(config, dynamicConfig.currentKafkaConfig) assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) @@ -123,7 +123,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.logManager).thenReturn(logManagerMock) Mockito.when(serverMock.kafkaScheduler).thenReturn(schedulerMock) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new BrokerDynamicThreadPool(serverMock)) config.dynamicConfig.addReconfigurable(acceptorMock) @@ -179,7 +179,7 @@ class DynamicBrokerConfigTest { when(serverMock.config).thenReturn(config) when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) // Test dynamic update with valid values @@ -215,7 +215,7 @@ class DynamicBrokerConfigTest { when(serverMock.config).thenReturn(config) when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) // Test dynamic update with invalid values @@ -246,7 +246,7 @@ class DynamicBrokerConfigTest { val origProps = TestUtils.createBrokerConfig(0, port = 8181) origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS") val config = KafkaConfig(origProps) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" -> "ks.p12") @@ -265,7 +265,7 @@ class DynamicBrokerConfigTest { val origProps = TestUtils.createBrokerConfig(0, port = 8181) origProps.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "100000000") val config = KafkaConfig(origProps) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val validProps = Map.empty[String, String] val invalidProps = Map(CleanerConfig.LOG_CLEANER_THREADS_PROP -> "20") @@ -368,7 +368,7 @@ class DynamicBrokerConfigTest { private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = { val configProps = TestUtils.createBrokerConfig(0, port = 8181) val config = KafkaConfig(configProps) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val props = new Properties props.put(name, value) @@ -459,7 +459,7 @@ class DynamicBrokerConfigTest { def testAuthorizerConfig(): Unit = { val props = TestUtils.createBrokerConfig(0, port = 9092) val oldConfig = KafkaConfig.fromProps(props) - oldConfig.dynamicConfig.initialize(None, None) + oldConfig.dynamicConfig.initialize(None) val kafkaServer: KafkaBroker = mock(classOf[kafka.server.KafkaBroker]) when(kafkaServer.config).thenReturn(oldConfig) @@ -505,7 +505,7 @@ class DynamicBrokerConfigTest { def testCombinedControllerAuthorizerConfig(): Unit = { val props = createCombinedControllerConfig(0, 9092) val oldConfig = KafkaConfig.fromProps(props) - oldConfig.dynamicConfig.initialize(None, None) + oldConfig.dynamicConfig.initialize(None) val controllerServer: ControllerServer = mock(classOf[kafka.server.ControllerServer]) when(controllerServer.config).thenReturn(oldConfig) @@ -550,7 +550,7 @@ class DynamicBrokerConfigTest { def testIsolatedControllerAuthorizerConfig(): Unit = { val props = createIsolatedControllerConfig(0, port = 9092) val oldConfig = KafkaConfig.fromProps(props) - oldConfig.dynamicConfig.initialize(None, None) + oldConfig.dynamicConfig.initialize(None) val controllerServer: ControllerServer = mock(classOf[kafka.server.ControllerServer]) when(controllerServer.config).thenReturn(oldConfig) @@ -589,7 +589,7 @@ class DynamicBrokerConfigTest { def testImproperConfigsAreRemoved(): Unit = { val props = TestUtils.createBrokerConfig(0) val config = KafkaConfig(props) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) assertEquals(SocketServerConfigs.MAX_CONNECTIONS_DEFAULT, config.maxConnections) assertEquals(LogConfig.DEFAULT_MAX_MESSAGE_BYTES, config.messageMaxBytes) @@ -624,7 +624,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId") config.dynamicConfig.addReconfigurable(m) assertEquals(1, m.currentReporters.size) @@ -649,7 +649,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId") config.dynamicConfig.addReconfigurable(m) assertTrue(m.currentReporters.isEmpty) @@ -681,7 +681,7 @@ class DynamicBrokerConfigTest { props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "2592000000") val config = KafkaConfig(props) val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker])) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) val newProps = new Properties() @@ -704,7 +704,7 @@ class DynamicBrokerConfigTest { props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "4294967296") val config = KafkaConfig(props) val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker])) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) val newProps = new Properties() @@ -727,7 +727,7 @@ class DynamicBrokerConfigTest { props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000") props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024") val config = KafkaConfig(props) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) // Check for invalid localRetentionMs < -2 verifyConfigUpdateWithInvalidConfig(config, props, Map.empty, Map(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP -> "-3")) @@ -757,7 +757,7 @@ class DynamicBrokerConfigTest { assertEquals(500, config.remoteLogManagerConfig.remoteFetchMaxWaitMs) val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig) val newProps = new Properties() @@ -792,7 +792,7 @@ class DynamicBrokerConfigTest { config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs) val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig) val newProps = new Properties() @@ -828,7 +828,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) val props = new Properties() @@ -851,7 +851,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, @@ -882,7 +882,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, @@ -917,7 +917,7 @@ class DynamicBrokerConfigTest { Mockito.when(serverMock.config).thenReturn(config) Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) // Default values @@ -964,7 +964,7 @@ class DynamicBrokerConfigTest { props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, retentionBytes.toString) val config = KafkaConfig(props) val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker])) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig) val newProps = new Properties() @@ -990,7 +990,7 @@ class DynamicBrokerConfigTest { Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig]))) .thenAnswer(invocation => currentDefaultLogConfig.set(invocation.getArgument(0))) - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) config.dynamicConfig.addBrokerReconfigurable(new DynamicLogConfig(logManagerMock, serverMock)) } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 0aa837f03e549..5365de394e52a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2539,7 +2539,7 @@ class ReplicaManagerTest { verify(addPartitionsToTxnManager, times(0)).addOrVerifyTransaction(any(), any(), any(), any(), any(), any()) // Dynamically enable verification. - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val props = new Properties() props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "true") config.dynamicConfig.updateBrokerConfig(config.brokerId, props) @@ -2591,7 +2591,7 @@ class ReplicaManagerTest { assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) // Disable verification - config.dynamicConfig.initialize(None, None) + config.dynamicConfig.initialize(None) val props = new Properties() props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false") config.dynamicConfig.updateBrokerConfig(config.brokerId, props) From ad5448ef176de3ff2da9e537fd3329f50259d082 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Tue, 14 Jan 2025 11:18:47 +0800 Subject: [PATCH 2/3] address comment Signed-off-by: PoAn Yang --- .../kafka/server/DynamicBrokerConfig.scala | 47 ++----------------- .../server/DynamicBrokerConfigTest.scala | 16 +++++++ 2 files changed, 20 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 73a0694461a86..48b599885d11e 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -881,7 +881,6 @@ object DynamicListenerConfig { */ val ReconfigurableConfigs = Set( // Listener configs - SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, SocketServerConfigs.LISTENERS_CONFIG, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, @@ -967,37 +966,13 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi DynamicListenerConfig.ReconfigurableConfigs } - private def listenerRegistrationsAltered( - oldAdvertisedListeners: Map[ListenerName, EndPoint], - newAdvertisedListeners: Map[ListenerName, EndPoint] - ): Boolean = { - if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true - oldAdvertisedListeners.foreachEntry { - case (oldListenerName, oldEndpoint) => - newAdvertisedListeners.get(oldListenerName) match { - case None => return true - case Some(newEndpoint) => if (!newEndpoint.equals(oldEndpoint)) { - return true - } - } - } - false - } - - private def verifyListenerRegistrationAlterationSupported(): Unit = { - if (!server.config.requiresZookeeper) { - throw new ConfigException("Advertised listeners cannot be altered when using a " + - "Raft-based metadata quorum.") - } - } - def validateReconfiguration(newConfig: KafkaConfig): Unit = { val oldConfig = server.config val newListeners = listenersToMap(newConfig.listeners) - val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedBrokerListeners) + val oldAdvertisedListeners = listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners) val oldListeners = listenersToMap(oldConfig.listeners) - if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet)) - throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'") + if (!oldAdvertisedListeners.keySet.subsetOf(newListeners.keySet)) + throw new ConfigException(s"Advertised listeners '$oldAdvertisedListeners' must be a subset of listeners '$newListeners'") if (!newListeners.keySet.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet)) throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.effectiveListenerSecurityProtocolMap}'") newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName => @@ -1013,15 +988,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != newConfig.effectiveListenerSecurityProtocolMap(listenerName)) throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName") } - if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName)) + if (!oldAdvertisedListeners.contains(newConfig.interBrokerListenerName)) throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}") - - // Currently, we do not support adding or removing listeners when in KRaft mode. - // However, we support changing other listener configurations (max connections, etc.) - if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners), - listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) { - verifyListenerRegistrationAlterationSupported() - } } def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { @@ -1036,13 +1004,6 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved) if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded) } - if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners), - listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) { - verifyListenerRegistrationAlterationSupported() - server match { - case _ => throw new RuntimeException("Unable to handle reconfigure") - } - } } private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] = diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 2a6c033ccf70b..10b42f96b4e54 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} import org.apache.kafka.common.config.{ConfigException, SslConfigs} import org.apache.kafka.common.metrics.{JmxReporter, Metrics} import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.server.authorizer._ @@ -1020,6 +1021,21 @@ class DynamicBrokerConfigTest { assertEquals(TimeUnit.HOURS.toMillis(1), ctx.config.logRetentionTimeMillis) assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG)) } + + @Test + def testAdvertisedListenersIsNotDynamicallyReconfigurable(): Unit = { + val origProps = TestUtils.createBrokerConfig(0, port = 8181) + val ctx = new DynamicLogConfigContext(origProps) + + // update advertised listeners should not work + val props = new Properties() + props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "SASL_PLAINTEXT://localhost:8181") + ctx.config.dynamicConfig.updateDefaultConfig(props) + ctx.config.effectiveAdvertisedBrokerListeners.foreach(e => + assertEquals(SecurityProtocol.PLAINTEXT.name, e.listenerName.value) + ) + assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)) + } } class TestDynamicThreadPool extends BrokerReconfigurable { From eb24f7e9b8c32c04afac49a02ea4283da249e50a Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Wed, 15 Jan 2025 17:54:20 +0800 Subject: [PATCH 3/3] address comment Signed-off-by: PoAn Yang --- .../scala/kafka/server/DynamicBrokerConfig.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 48b599885d11e..32febffb546e8 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -968,14 +968,14 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi def validateReconfiguration(newConfig: KafkaConfig): Unit = { val oldConfig = server.config - val newListeners = listenersToMap(newConfig.listeners) - val oldAdvertisedListeners = listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners) - val oldListeners = listenersToMap(oldConfig.listeners) - if (!oldAdvertisedListeners.keySet.subsetOf(newListeners.keySet)) + val newListeners = newConfig.listeners.map(_.listenerName).toSet + val oldAdvertisedListeners = oldConfig.effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet + val oldListeners = oldConfig.listeners.map(_.listenerName).toSet + if (!oldAdvertisedListeners.subsetOf(newListeners)) throw new ConfigException(s"Advertised listeners '$oldAdvertisedListeners' must be a subset of listeners '$newListeners'") - if (!newListeners.keySet.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet)) + if (!newListeners.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet)) throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.effectiveListenerSecurityProtocolMap}'") - newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName => + newListeners.intersect(oldListeners).foreach { listenerName => def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = { kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case (key, _) => // skip the reconfigurable configs @@ -988,8 +988,6 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != newConfig.effectiveListenerSecurityProtocolMap(listenerName)) throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName") } - if (!oldAdvertisedListeners.contains(newConfig.interBrokerListenerName)) - throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}") } def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {