Skip to content

Commit cb12723

Browse files
committed
KAFKA-18405: Remove ZooKeeper logic from DynamicBrokerConfig
Signed-off-by: PoAn Yang <[email protected]>
1 parent 70d6312 commit cb12723

File tree

7 files changed

+32
-55
lines changed

7 files changed

+32
-55
lines changed

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ class BrokerServer(
193193
info("Starting broker")
194194

195195
val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin()
196-
config.dynamicConfig.initialize(zkClientOpt = None, Some(clientMetricsReceiverPlugin))
196+
config.dynamicConfig.initialize(Some(clientMetricsReceiverPlugin))
197197

198198
/* start scheduler */
199199
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)

core/src/main/scala/kafka/server/ControllerServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ class ControllerServer(
124124
try {
125125
this.logIdent = logContext.logPrefix()
126126
info("Starting controller")
127-
config.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None)
127+
config.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None)
128128

129129
maybeChangeStatus(STARTING, STARTED)
130130

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import kafka.log.{LogCleaner, LogManager}
2626
import kafka.network.{DataPlaneAcceptor, SocketServer}
2727
import kafka.server.DynamicBrokerConfig._
2828
import kafka.utils.{CoreUtils, Logging}
29-
import kafka.zk.{AdminZkClient, KafkaZkClient}
3029
import org.apache.kafka.common.Reconfigurable
3130
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
3231
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs}
@@ -39,7 +38,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3938
import org.apache.kafka.network.SocketServerConfigs
4039
import org.apache.kafka.security.PasswordEncoder
4140
import org.apache.kafka.server.ProcessRole
42-
import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals}
41+
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
4342
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
4443
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
4544
import org.apache.kafka.server.telemetry.ClientTelemetry
@@ -58,8 +57,6 @@ import scala.jdk.CollectionConverters._
5857
* </ul>
5958
* The order of precedence for broker configs is:
6059
* <ol>
61-
* <li>DYNAMIC_BROKER_CONFIG: stored in ZK at /configs/brokers/{brokerId}</li>
62-
* <li>DYNAMIC_DEFAULT_BROKER_CONFIG: stored in ZK at /configs/brokers/&lt;default&gt;</li>
6360
* <li>STATIC_BROKER_CONFIG: properties that broker is started up with, typically from server.properties file</li>
6461
* <li>DEFAULT_CONFIG: Default configs defined in KafkaConfig</li>
6562
* </ol>
@@ -215,17 +212,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
215212
private var currentConfig: KafkaConfig = _
216213
private val dynamicConfigPasswordEncoder = Some(PasswordEncoder.NOOP)
217214

218-
private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
215+
private[server] def initialize(clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
219216
currentConfig = new KafkaConfig(kafkaConfig.props, false)
220217
metricsReceiverPluginOpt = clientMetricsReceiverPluginOpt
221-
222-
zkClientOpt.foreach { zkClient =>
223-
val adminZkClient = new AdminZkClient(zkClient)
224-
updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.BROKER, ZooKeeperInternals.DEFAULT_STRING), false)
225-
val props = adminZkClient.fetchEntityConfig(ConfigType.BROKER, kafkaConfig.brokerId.toString)
226-
val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
227-
updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
228-
}
229218
}
230219

231220
/**
@@ -427,14 +416,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
427416
props
428417
}
429418

430-
// If the secret has changed, password.encoder.old.secret contains the old secret that was used
431-
// to encode the configs in ZK. Decode passwords using the old secret and update ZK with values
432-
// encoded using the current secret. Ignore any errors during decoding since old secret may not
433-
// have been removed during broker restart.
434-
private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = {
435-
persistentProps.clone().asInstanceOf[Properties]
436-
}
437-
438419
/**
439420
* Validate the provided configs `propsOverride` and return the full Kafka configs with
440421
* the configured defaults and these overrides.

core/src/main/scala/kafka/server/SharedServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ class SharedServer(
268268
// This is only done in tests.
269269
metrics = new Metrics()
270270
}
271-
sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None)
271+
sharedServerConfig.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None)
272272

273273
if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) {
274274
brokerMetrics = new BrokerServerMetrics(metrics)

core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package kafka.server.metadata
1919

20-
import java.util.{OptionalInt, Properties}
20+
import java.util.OptionalInt
2121
import kafka.coordinator.transaction.TransactionCoordinator
2222
import kafka.log.LogManager
2323
import kafka.server.{KafkaConfig, ReplicaManager}
@@ -243,10 +243,6 @@ class BrokerMetadataPublisher(
243243
}
244244
}
245245

246-
def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
247-
config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
248-
}
249-
250246
/**
251247
* Update the coordinator of local replica changes: election and resignation.
252248
*

core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class DynamicBrokerConfigTest {
5757
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, oldKeystore)
5858
val config = KafkaConfig(props)
5959
val dynamicConfig = config.dynamicConfig
60-
dynamicConfig.initialize(None, None)
60+
dynamicConfig.initialize(None)
6161

6262
assertEquals(config, dynamicConfig.currentKafkaConfig)
6363
assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
@@ -123,7 +123,7 @@ class DynamicBrokerConfigTest {
123123
Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
124124
Mockito.when(serverMock.kafkaScheduler).thenReturn(schedulerMock)
125125

126-
config.dynamicConfig.initialize(None, None)
126+
config.dynamicConfig.initialize(None)
127127
config.dynamicConfig.addBrokerReconfigurable(new BrokerDynamicThreadPool(serverMock))
128128
config.dynamicConfig.addReconfigurable(acceptorMock)
129129

@@ -179,7 +179,7 @@ class DynamicBrokerConfigTest {
179179
when(serverMock.config).thenReturn(config)
180180
when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
181181

182-
config.dynamicConfig.initialize(None, None)
182+
config.dynamicConfig.initialize(None)
183183
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
184184

185185
// Test dynamic update with valid values
@@ -215,7 +215,7 @@ class DynamicBrokerConfigTest {
215215
when(serverMock.config).thenReturn(config)
216216
when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
217217

218-
config.dynamicConfig.initialize(None, None)
218+
config.dynamicConfig.initialize(None)
219219
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
220220

221221
// Test dynamic update with invalid values
@@ -246,7 +246,7 @@ class DynamicBrokerConfigTest {
246246
val origProps = TestUtils.createBrokerConfig(0, port = 8181)
247247
origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
248248
val config = KafkaConfig(origProps)
249-
config.dynamicConfig.initialize(None, None)
249+
config.dynamicConfig.initialize(None)
250250

251251
val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" -> "ks.p12")
252252

@@ -265,7 +265,7 @@ class DynamicBrokerConfigTest {
265265
val origProps = TestUtils.createBrokerConfig(0, port = 8181)
266266
origProps.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "100000000")
267267
val config = KafkaConfig(origProps)
268-
config.dynamicConfig.initialize(None, None)
268+
config.dynamicConfig.initialize(None)
269269

270270
val validProps = Map.empty[String, String]
271271
val invalidProps = Map(CleanerConfig.LOG_CLEANER_THREADS_PROP -> "20")
@@ -368,7 +368,7 @@ class DynamicBrokerConfigTest {
368368
private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = {
369369
val configProps = TestUtils.createBrokerConfig(0, port = 8181)
370370
val config = KafkaConfig(configProps)
371-
config.dynamicConfig.initialize(None, None)
371+
config.dynamicConfig.initialize(None)
372372

373373
val props = new Properties
374374
props.put(name, value)
@@ -459,7 +459,7 @@ class DynamicBrokerConfigTest {
459459
def testAuthorizerConfig(): Unit = {
460460
val props = TestUtils.createBrokerConfig(0, port = 9092)
461461
val oldConfig = KafkaConfig.fromProps(props)
462-
oldConfig.dynamicConfig.initialize(None, None)
462+
oldConfig.dynamicConfig.initialize(None)
463463

464464
val kafkaServer: KafkaBroker = mock(classOf[kafka.server.KafkaBroker])
465465
when(kafkaServer.config).thenReturn(oldConfig)
@@ -505,7 +505,7 @@ class DynamicBrokerConfigTest {
505505
def testCombinedControllerAuthorizerConfig(): Unit = {
506506
val props = createCombinedControllerConfig(0, 9092)
507507
val oldConfig = KafkaConfig.fromProps(props)
508-
oldConfig.dynamicConfig.initialize(None, None)
508+
oldConfig.dynamicConfig.initialize(None)
509509

510510
val controllerServer: ControllerServer = mock(classOf[kafka.server.ControllerServer])
511511
when(controllerServer.config).thenReturn(oldConfig)
@@ -550,7 +550,7 @@ class DynamicBrokerConfigTest {
550550
def testIsolatedControllerAuthorizerConfig(): Unit = {
551551
val props = createIsolatedControllerConfig(0, port = 9092)
552552
val oldConfig = KafkaConfig.fromProps(props)
553-
oldConfig.dynamicConfig.initialize(None, None)
553+
oldConfig.dynamicConfig.initialize(None)
554554

555555
val controllerServer: ControllerServer = mock(classOf[kafka.server.ControllerServer])
556556
when(controllerServer.config).thenReturn(oldConfig)
@@ -589,7 +589,7 @@ class DynamicBrokerConfigTest {
589589
def testImproperConfigsAreRemoved(): Unit = {
590590
val props = TestUtils.createBrokerConfig(0)
591591
val config = KafkaConfig(props)
592-
config.dynamicConfig.initialize(None, None)
592+
config.dynamicConfig.initialize(None)
593593

594594
assertEquals(SocketServerConfigs.MAX_CONNECTIONS_DEFAULT, config.maxConnections)
595595
assertEquals(LogConfig.DEFAULT_MAX_MESSAGE_BYTES, config.messageMaxBytes)
@@ -624,7 +624,7 @@ class DynamicBrokerConfigTest {
624624

625625
Mockito.when(serverMock.config).thenReturn(config)
626626

627-
config.dynamicConfig.initialize(None, None)
627+
config.dynamicConfig.initialize(None)
628628
val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId")
629629
config.dynamicConfig.addReconfigurable(m)
630630
assertEquals(1, m.currentReporters.size)
@@ -649,7 +649,7 @@ class DynamicBrokerConfigTest {
649649

650650
Mockito.when(serverMock.config).thenReturn(config)
651651

652-
config.dynamicConfig.initialize(None, None)
652+
config.dynamicConfig.initialize(None)
653653
val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId")
654654
config.dynamicConfig.addReconfigurable(m)
655655
assertTrue(m.currentReporters.isEmpty)
@@ -681,7 +681,7 @@ class DynamicBrokerConfigTest {
681681
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "2592000000")
682682
val config = KafkaConfig(props)
683683
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker]))
684-
config.dynamicConfig.initialize(None, None)
684+
config.dynamicConfig.initialize(None)
685685
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
686686

687687
val newProps = new Properties()
@@ -704,7 +704,7 @@ class DynamicBrokerConfigTest {
704704
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "4294967296")
705705
val config = KafkaConfig(props)
706706
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker]))
707-
config.dynamicConfig.initialize(None, None)
707+
config.dynamicConfig.initialize(None)
708708
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
709709

710710
val newProps = new Properties()
@@ -727,7 +727,7 @@ class DynamicBrokerConfigTest {
727727
props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000")
728728
props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024")
729729
val config = KafkaConfig(props)
730-
config.dynamicConfig.initialize(None, None)
730+
config.dynamicConfig.initialize(None)
731731

732732
// Check for invalid localRetentionMs < -2
733733
verifyConfigUpdateWithInvalidConfig(config, props, Map.empty, Map(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP -> "-3"))
@@ -757,7 +757,7 @@ class DynamicBrokerConfigTest {
757757
assertEquals(500, config.remoteLogManagerConfig.remoteFetchMaxWaitMs)
758758

759759
val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
760-
config.dynamicConfig.initialize(None, None)
760+
config.dynamicConfig.initialize(None)
761761
config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)
762762

763763
val newProps = new Properties()
@@ -792,7 +792,7 @@ class DynamicBrokerConfigTest {
792792
config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs)
793793

794794
val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
795-
config.dynamicConfig.initialize(None, None)
795+
config.dynamicConfig.initialize(None)
796796
config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)
797797

798798
val newProps = new Properties()
@@ -828,7 +828,7 @@ class DynamicBrokerConfigTest {
828828
Mockito.when(serverMock.config).thenReturn(config)
829829
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
830830

831-
config.dynamicConfig.initialize(None, None)
831+
config.dynamicConfig.initialize(None)
832832
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
833833

834834
val props = new Properties()
@@ -851,7 +851,7 @@ class DynamicBrokerConfigTest {
851851
Mockito.when(serverMock.config).thenReturn(config)
852852
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
853853

854-
config.dynamicConfig.initialize(None, None)
854+
config.dynamicConfig.initialize(None)
855855
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
856856

857857
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
@@ -882,7 +882,7 @@ class DynamicBrokerConfigTest {
882882
Mockito.when(serverMock.config).thenReturn(config)
883883
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
884884

885-
config.dynamicConfig.initialize(None, None)
885+
config.dynamicConfig.initialize(None)
886886
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
887887

888888
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
@@ -917,7 +917,7 @@ class DynamicBrokerConfigTest {
917917
Mockito.when(serverMock.config).thenReturn(config)
918918
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
919919

920-
config.dynamicConfig.initialize(None, None)
920+
config.dynamicConfig.initialize(None)
921921
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
922922

923923
// Default values
@@ -964,7 +964,7 @@ class DynamicBrokerConfigTest {
964964
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, retentionBytes.toString)
965965
val config = KafkaConfig(props)
966966
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[KafkaBroker]))
967-
config.dynamicConfig.initialize(None, None)
967+
config.dynamicConfig.initialize(None)
968968
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
969969

970970
val newProps = new Properties()
@@ -990,7 +990,7 @@ class DynamicBrokerConfigTest {
990990
Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig])))
991991
.thenAnswer(invocation => currentDefaultLogConfig.set(invocation.getArgument(0)))
992992

993-
config.dynamicConfig.initialize(None, None)
993+
config.dynamicConfig.initialize(None)
994994
config.dynamicConfig.addBrokerReconfigurable(new DynamicLogConfig(logManagerMock, serverMock))
995995
}
996996

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2539,7 +2539,7 @@ class ReplicaManagerTest {
25392539
verify(addPartitionsToTxnManager, times(0)).addOrVerifyTransaction(any(), any(), any(), any(), any(), any())
25402540

25412541
// Dynamically enable verification.
2542-
config.dynamicConfig.initialize(None, None)
2542+
config.dynamicConfig.initialize(None)
25432543
val props = new Properties()
25442544
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "true")
25452545
config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
@@ -2591,7 +2591,7 @@ class ReplicaManagerTest {
25912591
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
25922592

25932593
// Disable verification
2594-
config.dynamicConfig.initialize(None, None)
2594+
config.dynamicConfig.initialize(None)
25952595
val props = new Properties()
25962596
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, "false")
25972597
config.dynamicConfig.updateBrokerConfig(config.brokerId, props)

0 commit comments

Comments
 (0)