Skip to content

Commit 92fd99b

Browse files
authored
KAFKA-18479: Remove keepPartitionMetadataFile in UnifiedLog and LogMan… (#18491)
Reviewers: Jun Rao <[email protected]>
1 parent d82f03e commit 92fd99b

26 files changed

+38
-123
lines changed

core/src/main/java/kafka/server/builders/LogManagerBuilder.java

-7
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ public class LogManagerBuilder {
5656
private BrokerTopicStats brokerTopicStats = null;
5757
private LogDirFailureChannel logDirFailureChannel = null;
5858
private Time time = Time.SYSTEM;
59-
private boolean keepPartitionMetadataFile = true;
6059
private boolean remoteStorageSystemEnable = false;
6160
private long initialTaskDelayMs = ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT;
6261

@@ -145,11 +144,6 @@ public LogManagerBuilder setTime(Time time) {
145144
return this;
146145
}
147146

148-
public LogManagerBuilder setKeepPartitionMetadataFile(boolean keepPartitionMetadataFile) {
149-
this.keepPartitionMetadataFile = keepPartitionMetadataFile;
150-
return this;
151-
}
152-
153147
public LogManagerBuilder setRemoteStorageSystemEnable(boolean remoteStorageSystemEnable) {
154148
this.remoteStorageSystemEnable = remoteStorageSystemEnable;
155149
return this;
@@ -186,7 +180,6 @@ public LogManager build() {
186180
brokerTopicStats,
187181
logDirFailureChannel,
188182
time,
189-
keepPartitionMetadataFile,
190183
remoteStorageSystemEnable,
191184
initialTaskDelayMs);
192185
}

core/src/main/scala/kafka/log/LogManager.scala

+1-6
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ class LogManager(logDirs: Seq[File],
7878
brokerTopicStats: BrokerTopicStats,
7979
logDirFailureChannel: LogDirFailureChannel,
8080
time: Time,
81-
val keepPartitionMetadataFile: Boolean,
8281
remoteStorageSystemEnable: Boolean,
8382
val initialTaskDelayMs: Long) extends Logging {
8483

@@ -346,7 +345,6 @@ class LogManager(logDirs: Seq[File],
346345
logDirFailureChannel = logDirFailureChannel,
347346
lastShutdownClean = hadCleanShutdown,
348347
topicId = None,
349-
keepPartitionMetadataFile = keepPartitionMetadataFile,
350348
numRemainingSegments = numRemainingSegments,
351349
remoteStorageSystemEnable = remoteStorageSystemEnable)
352350

@@ -1074,7 +1072,6 @@ class LogManager(logDirs: Seq[File],
10741072
brokerTopicStats = brokerTopicStats,
10751073
logDirFailureChannel = logDirFailureChannel,
10761074
topicId = topicId,
1077-
keepPartitionMetadataFile = keepPartitionMetadataFile,
10781075
remoteStorageSystemEnable = remoteStorageSystemEnable)
10791076

10801077
if (isFuture)
@@ -1552,8 +1549,7 @@ object LogManager {
15521549
kafkaScheduler: Scheduler,
15531550
time: Time,
15541551
brokerTopicStats: BrokerTopicStats,
1555-
logDirFailureChannel: LogDirFailureChannel,
1556-
keepPartitionMetadataFile: Boolean): LogManager = {
1552+
logDirFailureChannel: LogDirFailureChannel): LogManager = {
15571553
val defaultProps = config.extractLogConfigMap
15581554

15591555
LogConfig.validateBrokerLogConfigValues(defaultProps, config.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
@@ -1578,7 +1574,6 @@ object LogManager {
15781574
brokerTopicStats = brokerTopicStats,
15791575
logDirFailureChannel = logDirFailureChannel,
15801576
time = time,
1581-
keepPartitionMetadataFile = keepPartitionMetadataFile,
15821577
interBrokerProtocolVersion = config.interBrokerProtocolVersion,
15831578
remoteStorageSystemEnable = config.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
15841579
initialTaskDelayMs = config.logInitialTaskDelayMs)

core/src/main/scala/kafka/log/UnifiedLog.scala

+17-43
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,6 @@ import scala.jdk.OptionConverters.{RichOption, RichOptional, RichOptionalInt}
8585
* @param _topicId optional Uuid to specify the topic ID for the topic if it exists. Should only be specified when
8686
* first creating the log through Partition.makeLeader or Partition.makeFollower. When reloading a log,
8787
* this field will be populated by reading the topic ID value from partition.metadata if it exists.
88-
* @param keepPartitionMetadataFile boolean flag to indicate whether the partition.metadata file should be kept in the
89-
* log directory. A partition.metadata file is only created when the raft controller is used
90-
* or the ZK controller and this broker's inter-broker protocol version is at least 2.8.
91-
* This file will persist the topic ID on the broker. If inter-broker protocol for a ZK controller
92-
* is downgraded below 2.8, a topic ID may be lost and a new ID generated upon re-upgrade.
93-
* If the inter-broker protocol version on a ZK cluster is below 2.8, partition.metadata
94-
* will be deleted to avoid ID conflicts upon re-upgrade.
9588
* @param remoteStorageSystemEnable flag to indicate whether the system level remote log storage is enabled or not.
9689
*/
9790
@threadsafe
@@ -102,7 +95,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
10295
@volatile var leaderEpochCache: LeaderEpochFileCache,
10396
val producerStateManager: ProducerStateManager,
10497
@volatile private var _topicId: Option[Uuid],
105-
val keepPartitionMetadataFile: Boolean,
10698
val remoteStorageSystemEnable: Boolean = false,
10799
@volatile private var logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging with AutoCloseable {
108100

@@ -190,40 +182,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
190182

191183
/**
192184
* Initialize topic ID information for the log by maintaining the partition metadata file and setting the in-memory _topicId.
193-
* Delete partition metadata file if the version does not support topic IDs.
194185
* Set _topicId based on a few scenarios:
195-
* - Recover topic ID if present and topic IDs are supported. Ensure we do not try to assign a provided topicId that is inconsistent
186+
* - Recover topic ID if present. Ensure we do not try to assign a provided topicId that is inconsistent
196187
* with the ID on file.
197-
* - If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
188+
* - If we were provided a topic ID when creating the log and one does not yet exist
198189
* set _topicId and write to the partition metadata file.
199-
* - Otherwise set _topicId to None
200190
*/
201191
private def initializeTopicId(): Unit = {
202192
val partMetadataFile = partitionMetadataFile.getOrElse(
203193
throw new KafkaException("The partitionMetadataFile should have been initialized"))
204194

205195
if (partMetadataFile.exists()) {
206-
if (keepPartitionMetadataFile) {
207-
val fileTopicId = partMetadataFile.read().topicId
208-
if (_topicId.isDefined && !_topicId.contains(fileTopicId))
209-
throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
210-
s"but log already contained topic ID $fileTopicId")
211-
212-
_topicId = Some(fileTopicId)
196+
val fileTopicId = partMetadataFile.read().topicId
197+
if (_topicId.isDefined && !_topicId.contains(fileTopicId))
198+
throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
199+
s"but log already contained topic ID $fileTopicId")
213200

214-
} else {
215-
try partMetadataFile.delete()
216-
catch {
217-
case e: IOException =>
218-
error(s"Error while trying to delete partition metadata file $partMetadataFile", e)
219-
}
220-
}
221-
} else if (keepPartitionMetadataFile) {
201+
_topicId = Some(fileTopicId)
202+
} else {
222203
_topicId.foreach(partMetadataFile.record)
223204
scheduler.scheduleOnce("flush-metadata-file", () => maybeFlushMetadataFile())
224-
} else {
225-
// We want to keep the file and the in-memory topic ID in sync.
226-
_topicId = None
227205
}
228206
}
229207

@@ -493,17 +471,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
493471
}
494472

495473
case None =>
496-
if (keepPartitionMetadataFile) {
497-
_topicId = Some(topicId)
498-
partitionMetadataFile match {
499-
case Some(partMetadataFile) =>
500-
if (!partMetadataFile.exists()) {
501-
partMetadataFile.record(topicId)
502-
scheduler.scheduleOnce("flush-metadata-file", () => maybeFlushMetadataFile())
503-
}
504-
case _ => warn(s"The topic id $topicId will not be persisted to the partition metadata file " +
505-
"since the partition is deleted")
506-
}
474+
_topicId = Some(topicId)
475+
partitionMetadataFile match {
476+
case Some(partMetadataFile) =>
477+
if (!partMetadataFile.exists()) {
478+
partMetadataFile.record(topicId)
479+
scheduler.scheduleOnce("flush-metadata-file", () => maybeFlushMetadataFile())
480+
}
481+
case _ => warn(s"The topic id $topicId will not be persisted to the partition metadata file " +
482+
"since the partition is deleted")
507483
}
508484
}
509485
}
@@ -1989,7 +1965,6 @@ object UnifiedLog extends Logging {
19891965
logDirFailureChannel: LogDirFailureChannel,
19901966
lastShutdownClean: Boolean = true,
19911967
topicId: Option[Uuid],
1992-
keepPartitionMetadataFile: Boolean,
19931968
numRemainingSegments: ConcurrentMap[String, Integer] = new ConcurrentHashMap[String, Integer],
19941969
remoteStorageSystemEnable: Boolean = false,
19951970
logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {
@@ -2034,7 +2009,6 @@ object UnifiedLog extends Logging {
20342009
leaderEpochCache,
20352010
producerStateManager,
20362011
topicId,
2037-
keepPartitionMetadataFile,
20382012
remoteStorageSystemEnable,
20392013
logOffsetsListener)
20402014
}

core/src/main/scala/kafka/raft/KafkaMetadataLog.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -620,8 +620,7 @@ object KafkaMetadataLog extends Logging {
620620
producerIdExpirationCheckIntervalMs = Int.MaxValue,
621621
logDirFailureChannel = new LogDirFailureChannel(5),
622622
lastShutdownClean = false,
623-
topicId = Some(topicId),
624-
keepPartitionMetadataFile = true
623+
topicId = Some(topicId)
625624
)
626625

627626
val metadataLog = new KafkaMetadataLog(

core/src/main/scala/kafka/server/BrokerServer.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,7 @@ class BrokerServer(
216216
kafkaScheduler,
217217
time,
218218
brokerTopicStats,
219-
logDirFailureChannel,
220-
keepPartitionMetadataFile = true)
219+
logDirFailureChannel)
221220

222221
remoteLogManagerOpt = createRemoteLogManager()
223222

core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -452,8 +452,7 @@ class PartitionLockTest extends Logging {
452452
log.producerIdExpirationCheckIntervalMs,
453453
leaderEpochCache,
454454
producerStateManager,
455-
_topicId = None,
456-
keepPartitionMetadataFile = true) {
455+
_topicId = None) {
457456

458457
override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, origin: AppendOrigin,
459458
requestLocal: RequestLocal, verificationGuard: VerificationGuard): LogAppendInfo = {

core/src/test/scala/unit/kafka/cluster/PartitionTest.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -3622,8 +3622,7 @@ class PartitionTest extends AbstractPartitionTest {
36223622
log.producerIdExpirationCheckIntervalMs,
36233623
leaderEpochCache,
36243624
producerStateManager,
3625-
_topicId = None,
3626-
keepPartitionMetadataFile = true) {
3625+
_topicId = None) {
36273626

36283627
override def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
36293628
appendSemaphore.acquire()

core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,7 @@ abstract class AbstractLogCleanerIntegrationTest {
117117
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
118118
producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
119119
logDirFailureChannel = new LogDirFailureChannel(10),
120-
topicId = None,
121-
keepPartitionMetadataFile = true)
120+
topicId = None)
122121
logMap.put(partition, log)
123122
this.logs += log
124123
}

core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,7 @@ class BrokerCompressionTest {
6969
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
7070
producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
7171
logDirFailureChannel = new LogDirFailureChannel(10),
72-
topicId = None,
73-
keepPartitionMetadataFile = true
72+
topicId = None
7473
)
7574

7675
/* append two messages */

core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala

+3-5
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ class LogCleanerManagerTest extends Logging {
133133
// the exception should be caught and the partition that caused it marked as uncleanable
134134
class LogMock extends UnifiedLog(offsets.logStartOffset, localLog, new BrokerTopicStats,
135135
producerIdExpirationCheckIntervalMs, leaderEpochCache,
136-
producerStateManager, _topicId = None, keepPartitionMetadataFile = true) {
136+
producerStateManager, _topicId = None) {
137137
// Throw an error in getFirstBatchTimestampForSegments since it is called in grabFilthiestLog()
138138
override def getFirstBatchTimestampForSegments(segments: util.Collection[LogSegment]): util.Collection[java.lang.Long] =
139139
throw new IllegalStateException("Error!")
@@ -821,8 +821,7 @@ class LogCleanerManagerTest extends Logging {
821821
producerStateManagerConfig = producerStateManagerConfig,
822822
producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
823823
logDirFailureChannel = new LogDirFailureChannel(10),
824-
topicId = None,
825-
keepPartitionMetadataFile = true)
824+
topicId = None)
826825
}
827826

828827
private def createLowRetentionLogConfig(segmentSize: Int, cleanupPolicy: String): LogConfig = {
@@ -875,8 +874,7 @@ class LogCleanerManagerTest extends Logging {
875874
producerStateManagerConfig = producerStateManagerConfig,
876875
producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
877876
logDirFailureChannel = new LogDirFailureChannel(10),
878-
topicId = None,
879-
keepPartitionMetadataFile = true
877+
topicId = None
880878
)
881879
}
882880

core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

+2-4
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,7 @@ class LogCleanerTest extends Logging {
216216
producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs,
217217
leaderEpochCache = leaderEpochCache,
218218
producerStateManager = producerStateManager,
219-
_topicId = None,
220-
keepPartitionMetadataFile = true) {
219+
_topicId = None) {
221220
override def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment]): Unit = {
222221
deleteStartLatch.countDown()
223222
if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) {
@@ -2093,8 +2092,7 @@ class LogCleanerTest extends Logging {
20932092
producerStateManagerConfig = producerStateManagerConfig,
20942093
producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
20952094
logDirFailureChannel = new LogDirFailureChannel(10),
2096-
topicId = None,
2097-
keepPartitionMetadataFile = true
2095+
topicId = None
20982096
)
20992097
}
21002098

core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,7 @@ class LogConcurrencyTest {
156156
producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
157157
producerIdExpirationCheckIntervalMs = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
158158
logDirFailureChannel = new LogDirFailureChannel(10),
159-
topicId = None,
160-
keepPartitionMetadataFile = true
159+
topicId = None
161160
)
162161
}
163162

core/src/test/scala/unit/kafka/log/LogLoaderTest.scala

+3-6
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ class LogLoaderTest {
127127
brokerTopicStats = new BrokerTopicStats(),
128128
logDirFailureChannel = logDirFailureChannel,
129129
time = time,
130-
keepPartitionMetadataFile = true,
131130
remoteStorageSystemEnable = config.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
132131
initialTaskDelayMs = config.logInitialTaskDelayMs) {
133132

@@ -324,7 +323,7 @@ class LogLoaderTest {
324323
logDirFailureChannel)
325324
new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats,
326325
producerIdExpirationCheckIntervalMs, leaderEpochCache, producerStateManager,
327-
None, keepPartitionMetadataFile = true)
326+
None)
328327
}
329328

330329
// Retain snapshots for the last 2 segments
@@ -447,8 +446,7 @@ class LogLoaderTest {
447446
producerIdExpirationCheckIntervalMs = 30000,
448447
leaderEpochCache = leaderEpochCache,
449448
producerStateManager = stateManager,
450-
_topicId = None,
451-
keepPartitionMetadataFile = true)
449+
_topicId = None)
452450

453451
verify(stateManager).updateMapEndOffset(0L)
454452
verify(stateManager).removeStraySnapshots(any())
@@ -557,8 +555,7 @@ class LogLoaderTest {
557555
producerIdExpirationCheckIntervalMs = 30000,
558556
leaderEpochCache = leaderEpochCache,
559557
producerStateManager = stateManager,
560-
_topicId = None,
561-
keepPartitionMetadataFile = true)
558+
_topicId = None)
562559

563560
verify(stateManager).removeStraySnapshots(any[java.util.List[java.lang.Long]])
564561
verify(stateManager, times(2)).updateMapEndOffset(0L)

core/src/test/scala/unit/kafka/log/LogManagerTest.scala

-2
Original file line numberDiff line numberDiff line change
@@ -975,7 +975,6 @@ class LogManagerTest {
975975
// not clean shutdown
976976
lastShutdownClean = false,
977977
topicId = None,
978-
keepPartitionMetadataFile = false,
979978
// pass mock map for verification later
980979
numRemainingSegments = mockMap)
981980

@@ -1383,7 +1382,6 @@ class LogManagerTest {
13831382
time = Time.SYSTEM,
13841383
brokerTopicStats = new BrokerTopicStats,
13851384
logDirFailureChannel = new LogDirFailureChannel(1),
1386-
keepPartitionMetadataFile = true,
13871385
interBrokerProtocolVersion = MetadataVersion.latestTesting,
13881386
remoteStorageSystemEnable = false,
13891387
initialTaskDelayMs = 0)

core/src/test/scala/unit/kafka/log/LogTestUtils.scala

-2
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ object LogTestUtils {
103103
producerIdExpirationCheckIntervalMs: Int = TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
104104
lastShutdownClean: Boolean = true,
105105
topicId: Option[Uuid] = None,
106-
keepPartitionMetadataFile: Boolean = true,
107106
numRemainingSegments: ConcurrentMap[String, Integer] = new ConcurrentHashMap[String, Integer],
108107
remoteStorageSystemEnable: Boolean = false,
109108
remoteLogManager: Option[RemoteLogManager] = None,
@@ -122,7 +121,6 @@ object LogTestUtils {
122121
logDirFailureChannel = new LogDirFailureChannel(10),
123122
lastShutdownClean = lastShutdownClean,
124123
topicId = topicId,
125-
keepPartitionMetadataFile = keepPartitionMetadataFile,
126124
numRemainingSegments = numRemainingSegments,
127125
remoteStorageSystemEnable = remoteStorageSystemEnable,
128126
logOffsetsListener = logOffsetsListener

0 commit comments

Comments
 (0)