Skip to content

KAFKA-18405: Remove ZooKeeper logic from DynamicBrokerConfig #18508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class BrokerServer(
info("Starting broker")

val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin()
config.dynamicConfig.initialize(zkClientOpt = None, Some(clientMetricsReceiverPlugin))
config.dynamicConfig.initialize(Some(clientMetricsReceiverPlugin))

/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class ControllerServer(
try {
this.logIdent = logContext.logPrefix()
info("Starting controller")
config.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None)
config.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None)

maybeChangeStatus(STARTING, STARTED)

Expand Down
78 changes: 9 additions & 69 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import kafka.log.{LogCleaner, LogManager}
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs}
Expand All @@ -39,7 +38,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
import org.apache.kafka.server.telemetry.ClientTelemetry
Expand All @@ -58,8 +57,6 @@ import scala.jdk.CollectionConverters._
* </ul>
* The order of precedence for broker configs is:
* <ol>
* <li>DYNAMIC_BROKER_CONFIG: stored in ZK at /configs/brokers/{brokerId}</li>
* <li>DYNAMIC_DEFAULT_BROKER_CONFIG: stored in ZK at /configs/brokers/&lt;default&gt;</li>
* <li>STATIC_BROKER_CONFIG: properties that broker is started up with, typically from server.properties file</li>
* <li>DEFAULT_CONFIG: Default configs defined in KafkaConfig</li>
* </ol>
Expand Down Expand Up @@ -215,17 +212,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private var currentConfig: KafkaConfig = _
private val dynamicConfigPasswordEncoder = Some(PasswordEncoder.NOOP)

private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
private[server] def initialize(clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
currentConfig = new KafkaConfig(kafkaConfig.props, false)
metricsReceiverPluginOpt = clientMetricsReceiverPluginOpt

zkClientOpt.foreach { zkClient =>
val adminZkClient = new AdminZkClient(zkClient)
updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.BROKER, ZooKeeperInternals.DEFAULT_STRING), false)
val props = adminZkClient.fetchEntityConfig(ConfigType.BROKER, kafkaConfig.brokerId.toString)
val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
}
}

/**
Expand Down Expand Up @@ -427,14 +416,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
props
}

// If the secret has changed, password.encoder.old.secret contains the old secret that was used
// to encode the configs in ZK. Decode passwords using the old secret and update ZK with values
// encoded using the current secret. Ignore any errors during decoding since old secret may not
// have been removed during broker restart.
private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = {
persistentProps.clone().asInstanceOf[Properties]
}

/**
* Validate the provided configs `propsOverride` and return the full Kafka configs with
* the configured defaults and these overrides.
Expand Down Expand Up @@ -900,7 +881,6 @@ object DynamicListenerConfig {
*/
val ReconfigurableConfigs = Set(
// Listener configs
SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Could you please file a minor follow-up to add this change to zk2kraft.html?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will create a minor PR for it. Thanks.

SocketServerConfigs.LISTENERS_CONFIG,
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,

Expand Down Expand Up @@ -986,40 +966,16 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
DynamicListenerConfig.ReconfigurableConfigs
}

private def listenerRegistrationsAltered(
oldAdvertisedListeners: Map[ListenerName, EndPoint],
newAdvertisedListeners: Map[ListenerName, EndPoint]
): Boolean = {
if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true
oldAdvertisedListeners.foreachEntry {
case (oldListenerName, oldEndpoint) =>
newAdvertisedListeners.get(oldListenerName) match {
case None => return true
case Some(newEndpoint) => if (!newEndpoint.equals(oldEndpoint)) {
return true
}
}
}
false
}

private def verifyListenerRegistrationAlterationSupported(): Unit = {
if (!server.config.requiresZookeeper) {
throw new ConfigException("Advertised listeners cannot be altered when using a " +
"Raft-based metadata quorum.")
}
}

def validateReconfiguration(newConfig: KafkaConfig): Unit = {
val oldConfig = server.config
val newListeners = listenersToMap(newConfig.listeners)
val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedBrokerListeners)
val oldListeners = listenersToMap(oldConfig.listeners)
if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
if (!newListeners.keySet.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
val newListeners = newConfig.listeners.map(_.listenerName).toSet
val oldAdvertisedListeners = oldConfig.effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
val oldListeners = oldConfig.listeners.map(_.listenerName).toSet
if (!oldAdvertisedListeners.subsetOf(newListeners))
throw new ConfigException(s"Advertised listeners '$oldAdvertisedListeners' must be a subset of listeners '$newListeners'")
if (!newListeners.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.effectiveListenerSecurityProtocolMap}'")
newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
newListeners.intersect(oldListeners).foreach { listenerName =>
def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = {
kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case (key, _) =>
// skip the reconfigurable configs
Expand All @@ -1032,15 +988,6 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != newConfig.effectiveListenerSecurityProtocolMap(listenerName))
throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName")
}
if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName))
throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}")

// Currently, we do not support adding or removing listeners when in KRaft mode.
// However, we support changing other listener configurations (max connections, etc.)
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
verifyListenerRegistrationAlterationSupported()
}
}

def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
Expand All @@ -1055,13 +1002,6 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved)
if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded)
}
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
verifyListenerRegistrationAlterationSupported()
server match {
case _ => throw new RuntimeException("Unable to handle reconfigure")
}
}
}

private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] =
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class SharedServer(
// This is only done in tests.
metrics = new Metrics()
}
sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None)
sharedServerConfig.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None)

if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) {
brokerMetrics = new BrokerServerMetrics(metrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package kafka.server.metadata

import java.util.{OptionalInt, Properties}
import java.util.OptionalInt
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager
import kafka.server.{KafkaConfig, ReplicaManager}
Expand Down Expand Up @@ -243,10 +243,6 @@ class BrokerMetadataPublisher(
}
}

def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
}

/**
* Update the coordinator of local replica changes: election and resignation.
*
Expand Down
Loading
Loading