Skip to content

Commit af1e514

Browse files
mimaisonpranavt84
authored andcommitted
KAFKA-14485: Move LogCleaner exceptions to storage module (apache#18534)
Reviewers: Luke Chen <[email protected]>, Ken Huang <[email protected]>
1 parent 5a88fa0 commit af1e514

File tree

5 files changed

+18
-23
lines changed

5 files changed

+18
-23
lines changed

core/src/main/scala/kafka/log/LogCleaner.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.io.{File, IOException}
2121
import java.nio._
2222
import java.util.Date
2323
import java.util.concurrent.TimeUnit
24-
import kafka.common._
2524
import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
2625
import kafka.server.{BrokerReconfigurable, KafkaConfig}
2726
import kafka.utils.{Logging, Pool}
@@ -35,7 +34,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time}
3534
import org.apache.kafka.server.config.ServerConfigs
3635
import org.apache.kafka.server.metrics.KafkaMetricsGroup
3736
import org.apache.kafka.server.util.ShutdownableThread
38-
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, TransactionIndex}
37+
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogCleaningAbortedException, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, ThreadShutdownException, TransactionIndex}
3938
import org.apache.kafka.storage.internals.utils.Throttler
4039

4140
import scala.jdk.CollectionConverters._

core/src/main/scala/kafka/log/LogCleanerManager.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,13 @@ import java.lang.{Long => JLong}
2121
import java.io.File
2222
import java.util.concurrent.TimeUnit
2323
import java.util.concurrent.locks.ReentrantLock
24-
import kafka.common.LogCleaningAbortedException
2524
import kafka.utils.CoreUtils._
2625
import kafka.utils.{Logging, Pool}
2726
import org.apache.kafka.common.{KafkaException, TopicPartition}
2827
import org.apache.kafka.common.errors.KafkaStorageException
2928
import org.apache.kafka.common.utils.Time
3029
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
31-
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
30+
import org.apache.kafka.storage.internals.log.{LogCleaningAbortedException, LogDirFailureChannel}
3231
import org.apache.kafka.server.metrics.KafkaMetricsGroup
3332

3433
import java.util.Comparator

core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

+6-7
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package kafka.log
1919

20-
import kafka.common._
2120
import kafka.server.KafkaConfig
2221
import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
2322
import org.apache.kafka.common.TopicPartition
@@ -29,7 +28,7 @@ import org.apache.kafka.common.utils.Utils
2928
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3029
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
3130
import org.apache.kafka.server.util.MockTime
32-
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
31+
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
3332
import org.apache.kafka.storage.internals.utils.Throttler
3433
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
3534
import org.junit.jupiter.api.Assertions._
@@ -1217,18 +1216,18 @@ class LogCleanerTest extends Logging {
12171216

12181217
def distinctValuesBySegment = log.logSegments.asScala.map(s => s.log.records.asScala.map(record => TestUtils.readString(record.value)).toSet.size).toSeq
12191218

1220-
val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
1219+
val distinctValuesBySegmentBeforeClean = distinctValuesBySegment
12211220
assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N),
12221221
"Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.")
12231222

12241223
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, firstUncleanableOffset))
12251224

12261225
val distinctValuesBySegmentAfterClean = distinctValuesBySegment
12271226

1228-
assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
1227+
assertTrue(distinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
12291228
.take(numCleanableSegments).forall { case (before, after) => after < before },
12301229
"The cleanable segments should have fewer number of values after cleaning")
1231-
assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
1230+
assertTrue(distinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
12321231
.slice(numCleanableSegments, numTotalSegments).forall { x => x._1 == x._2 }, "The uncleanable segments should have the same number of values after cleaning")
12331232
}
12341233

@@ -1240,9 +1239,9 @@ class LogCleanerTest extends Logging {
12401239
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
12411240

12421241
// create 6 segments with only one message in each segment
1243-
def createRecorcs = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
1242+
def createRecords = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
12441243
for (_ <- 0 until 6)
1245-
log.appendAsLeader(createRecorcs, leaderEpoch = 0)
1244+
log.appendAsLeader(createRecords, leaderEpoch = 0)
12461245

12471246
val logToClean = LogToClean(new TopicPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset)
12481247

Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one or more
3-
* contributor license agreements. See the NOTICE file distributed with
3+
* contributor license agreements. See the NOTICE file distributed with
44
* this work for additional information regarding copyright ownership.
55
* The ASF licenses this file to You under the Apache License, Version 2.0
66
* (the "License"); you may not use this file except in compliance with
7-
* the License. You may obtain a copy of the License at
7+
* the License. You may obtain a copy of the License at
88
*
99
* http://www.apache.org/licenses/LICENSE-2.0
1010
*
@@ -14,11 +14,10 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
18-
package kafka.common
17+
package org.apache.kafka.storage.internals.log;
1918

2019
/**
2120
* Thrown when a log cleaning task is requested to be aborted.
2221
*/
23-
class LogCleaningAbortedException extends RuntimeException() {
22+
public class LogCleaningAbortedException extends RuntimeException {
2423
}
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one or more
3-
* contributor license agreements. See the NOTICE file distributed with
3+
* contributor license agreements. See the NOTICE file distributed with
44
* this work for additional information regarding copyright ownership.
55
* The ASF licenses this file to You under the Apache License, Version 2.0
66
* (the "License"); you may not use this file except in compliance with
7-
* the License. You may obtain a copy of the License at
7+
* the License. You may obtain a copy of the License at
88
*
99
* http://www.apache.org/licenses/LICENSE-2.0
1010
*
@@ -14,11 +14,10 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
18-
package kafka.common
17+
package org.apache.kafka.storage.internals.log;
1918

2019
/**
2120
* An exception that indicates a thread is being shut down normally.
2221
*/
23-
class ThreadShutdownException extends RuntimeException {
22+
public class ThreadShutdownException extends RuntimeException {
2423
}

0 commit comments

Comments
 (0)