Skip to content

Commit 5a1fb15

Browse files
KAFKA-18373: Remove ZkMetadataCache (#18553)
Reviewers: Mickael Maison <[email protected]>, Ismael Juma <[email protected]>
1 parent 042da16 commit 5a1fb15

File tree

10 files changed

+32
-1259
lines changed

10 files changed

+32
-1259
lines changed

core/src/main/scala/kafka/cluster/Partition.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import kafka.controller.StateChangeLogger
2323
import kafka.log._
2424
import kafka.log.remote.RemoteLogManager
2525
import kafka.server._
26-
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
26+
import kafka.server.metadata.KRaftMetadataCache
2727
import kafka.server.share.DelayedShareFetch
2828
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
2929
import kafka.utils._
@@ -1086,11 +1086,6 @@ class Partition(val topicPartition: TopicPartition,
10861086
!kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) &&
10871087
isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)
10881088

1089-
// In ZK mode, we just ensure the broker is alive. Although we do not check for shutting down brokers here,
1090-
// the controller will block them from being added to ISR.
1091-
case zkMetadataCache: ZkMetadataCache =>
1092-
zkMetadataCache.hasAliveBroker(followerReplicaId)
1093-
10941089
case _ => true
10951090
}
10961091
}

core/src/main/scala/kafka/server/MetadataCache.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
package kafka.server
1919

20-
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
20+
import kafka.server.metadata.KRaftMetadataCache
2121
import org.apache.kafka.admin.BrokerMetadata
2222
import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
2323
import org.apache.kafka.common.network.ListenerName
2424
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
25-
import org.apache.kafka.server.BrokerFeatures
2625
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}
2726

2827
import java.util
@@ -117,13 +116,6 @@ trait MetadataCache {
117116
}
118117

119118
object MetadataCache {
120-
def zkMetadataCache(brokerId: Int,
121-
metadataVersion: MetadataVersion,
122-
brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty())
123-
: ZkMetadataCache = {
124-
new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures)
125-
}
126-
127119
def kRaftMetadataCache(
128120
brokerId: Int,
129121
kraftVersionSupplier: Supplier[KRaftVersion]

core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala

Lines changed: 1 addition & 690 deletions
Large diffs are not rendered by default.

core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@
1717
package kafka.cluster
1818

1919
import kafka.log.UnifiedLog
20-
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
20+
import kafka.server.metadata.KRaftMetadataCache
2121
import org.apache.kafka.common.TopicPartition
2222
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
2323
import org.apache.kafka.server.util.MockTime
2424
import org.apache.kafka.storage.internals.log.LogOffsetMetadata
2525
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
2626
import org.junit.jupiter.api.{BeforeEach, Test}
27-
import org.junit.jupiter.params.ParameterizedTest
28-
import org.junit.jupiter.params.provider.ValueSource
2927
import org.mockito.Mockito.{mock, when}
3028

3129
object ReplicaTest {
@@ -320,16 +318,10 @@ class ReplicaTest {
320318
assertFalse(isCaughtUp(leaderEndOffset = 16L))
321319
}
322320

323-
@ParameterizedTest
324-
@ValueSource(booleans = Array(true, false))
325-
def testFenceStaleUpdates(isKraft: Boolean): Unit = {
326-
val metadataCache = if (isKraft) {
327-
val kRaftMetadataCache = mock(classOf[KRaftMetadataCache])
328-
when(kRaftMetadataCache.getAliveBrokerEpoch(BrokerId)).thenReturn(Option(2L))
329-
kRaftMetadataCache
330-
} else {
331-
mock(classOf[ZkMetadataCache])
332-
}
321+
@Test
322+
def testFenceStaleUpdates(): Unit = {
323+
val metadataCache = mock(classOf[KRaftMetadataCache])
324+
when(metadataCache.getAliveBrokerEpoch(BrokerId)).thenReturn(Option(2L))
333325

334326
val replica = new Replica(BrokerId, Partition, metadataCache)
335327
replica.updateFetchStateOrThrow(
@@ -339,24 +331,13 @@ class ReplicaTest {
339331
leaderEndOffset = 10L,
340332
brokerEpoch = 2L
341333
)
342-
if (isKraft) {
343-
assertThrows(classOf[NotLeaderOrFollowerException], () => replica.updateFetchStateOrThrow(
344-
followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
345-
followerStartOffset = 2L,
346-
followerFetchTimeMs = 3,
347-
leaderEndOffset = 10L,
348-
brokerEpoch = 1L
349-
))
350-
} else {
351-
// No exception to expect under ZK mode.
352-
replica.updateFetchStateOrThrow(
353-
followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
354-
followerStartOffset = 2L,
355-
followerFetchTimeMs = 3,
356-
leaderEndOffset = 10L,
357-
brokerEpoch = 1L
358-
)
359-
}
334+
assertThrows(classOf[NotLeaderOrFollowerException], () => replica.updateFetchStateOrThrow(
335+
followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
336+
followerStartOffset = 2L,
337+
followerFetchTimeMs = 3,
338+
leaderEndOffset = 10L,
339+
brokerEpoch = 1L
340+
))
360341
replica.updateFetchStateOrThrow(
361342
followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
362343
followerStartOffset = 2L,

core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616
*/
1717
package kafka.server
1818

19-
import kafka.server.metadata.ZkMetadataCache
2019
import org.apache.kafka.clients.NodeApiVersions
2120
import org.apache.kafka.common.message.ApiMessageType.ListenerType
2221
import org.apache.kafka.common.protocol.ApiKeys
2322
import org.apache.kafka.server.BrokerFeatures
24-
import org.apache.kafka.server.common.MetadataVersion
23+
import org.apache.kafka.server.common.KRaftVersion
2524
import org.junit.jupiter.api.{Disabled, Test}
2625
import org.junit.jupiter.api.Assertions._
2726
import org.junit.jupiter.params.ParameterizedTest
@@ -32,7 +31,7 @@ import scala.jdk.CollectionConverters._
3231

3332
class ApiVersionManagerTest {
3433
private val brokerFeatures = BrokerFeatures.createDefault(true)
35-
private val metadataCache = new ZkMetadataCache(1, MetadataVersion.latestTesting(), brokerFeatures)
34+
private val metadataCache = MetadataCache.kRaftMetadataCache(1, () => KRaftVersion.LATEST_PRODUCTION)
3635

3736
@ParameterizedTest
3837
@EnumSource(classOf[ListenerType])

core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala

Lines changed: 0 additions & 109 deletions
This file was deleted.

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,6 @@ class KafkaApisTest extends Logging {
525525

526526
val capturedResponse = verifyNoThrottling[AbstractResponse](request)
527527
assertEquals(expectedResponse.data, capturedResponse.data)
528-
529528
}
530529

531530
private def authorizeResource(authorizer: Authorizer,

0 commit comments

Comments
 (0)