diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 43193016fd06b..aac22865c98f4 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -21,7 +21,6 @@ import java.io.{File, IOException} import java.nio._ import java.util.Date import java.util.concurrent.TimeUnit -import kafka.common._ import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName} import kafka.server.{BrokerReconfigurable, KafkaConfig} import kafka.utils.{Logging, Pool} @@ -35,7 +34,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.server.config.ServerConfigs import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.ShutdownableThread -import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, TransactionIndex} +import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogCleaningAbortedException, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, ThreadShutdownException, TransactionIndex} import org.apache.kafka.storage.internals.utils.Throttler import scala.jdk.CollectionConverters._ diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 7238eacad9e56..3e126e45ffeb0 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -21,14 +21,13 @@ import java.lang.{Long => JLong} import java.io.File import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock -import kafka.common.LogCleaningAbortedException import kafka.utils.CoreUtils._ import kafka.utils.{Logging, Pool} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.utils.Time import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile -import org.apache.kafka.storage.internals.log.LogDirFailureChannel +import org.apache.kafka.storage.internals.log.{LogCleaningAbortedException, LogDirFailureChannel} import org.apache.kafka.server.metrics.KafkaMetricsGroup import java.util.Comparator diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 9100cc7af211a..92d80d695400b 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -17,7 +17,6 @@ package kafka.log -import kafka.common._ import kafka.server.KafkaConfig import kafka.utils.{CoreUtils, Logging, Pool, TestUtils} import org.apache.kafka.common.TopicPartition @@ -29,7 +28,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.MockTime -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig} import org.apache.kafka.storage.internals.utils.Throttler import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ @@ -1218,7 +1217,7 @@ class LogCleanerTest extends Logging { def distinctValuesBySegment = log.logSegments.asScala.map(s => s.log.records.asScala.map(record => TestUtils.readString(record.value)).toSet.size).toSeq - val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment + val distinctValuesBySegmentBeforeClean = distinctValuesBySegment assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N), "Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.") @@ -1226,10 +1225,10 @@ class LogCleanerTest extends Logging { val distinctValuesBySegmentAfterClean = distinctValuesBySegment - assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean) + assertTrue(distinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean) .take(numCleanableSegments).forall { case (before, after) => after < before }, "The cleanable segments should have fewer number of values after cleaning") - assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean) + assertTrue(distinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean) .slice(numCleanableSegments, numTotalSegments).forall { x => x._1 == x._2 }, "The uncleanable segments should have the same number of values after cleaning") } @@ -1241,9 +1240,9 @@ class LogCleanerTest extends Logging { val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) // create 6 segments with only one message in each segment - def createRecorcs = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes) + def createRecords = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes) for (_ <- 0 until 6) - log.appendAsLeader(createRecorcs, leaderEpoch = 0) + log.appendAsLeader(createRecords, leaderEpoch = 0) val logToClean = LogToClean(new TopicPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset) diff --git a/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaningAbortedException.java similarity index 75% rename from core/src/main/scala/kafka/common/LogCleaningAbortedException.scala rename to storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaningAbortedException.java index dfded33f009e4..1c1f8d9075291 100644 --- a/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaningAbortedException.java @@ -1,10 +1,10 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,11 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package kafka.common +package org.apache.kafka.storage.internals.log; /** * Thrown when a log cleaning task is requested to be aborted. */ -class LogCleaningAbortedException extends RuntimeException() { +public class LogCleaningAbortedException extends RuntimeException { } diff --git a/core/src/main/scala/kafka/common/ThreadShutdownException.scala b/storage/src/main/java/org/apache/kafka/storage/internals/log/ThreadShutdownException.java similarity index 75% rename from core/src/main/scala/kafka/common/ThreadShutdownException.scala rename to storage/src/main/java/org/apache/kafka/storage/internals/log/ThreadShutdownException.java index 8cd6601ce5aa9..02c7167487a3c 100644 --- a/core/src/main/scala/kafka/common/ThreadShutdownException.scala +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/ThreadShutdownException.java @@ -1,10 +1,10 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,11 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package kafka.common +package org.apache.kafka.storage.internals.log; /** * An exception that indicates a thread is being shut down normally. */ -class ThreadShutdownException extends RuntimeException { +public class ThreadShutdownException extends RuntimeException { }