Skip to content

Commit c7bf7be

Browse files
jayteejmanoj-mathivanan
authored andcommitted
KAFKA-16368: Add a new constraint for segment.bytes to min 1MB for KIP-1030 (apache#18140)
Reviewers: Divij Vaidya <[email protected]>
1 parent 7cd1511 commit c7bf7be

File tree

6 files changed

+14
-8
lines changed

6 files changed

+14
-8
lines changed

core/src/test/java/kafka/admin/DeleteTopicTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.function.Supplier;
5757
import java.util.stream.Collectors;
5858

59+
import scala.Option;
5960
import scala.jdk.javaapi.OptionConverters;
6061

6162
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -238,7 +239,6 @@ public void testDeleteNonExistingTopic(ClusterInstance cluster) throws Exception
238239
@ClusterTest(serverProperties = {
239240
@ClusterConfigProperty(key = "log.cleaner.enable", value = "true"),
240241
@ClusterConfigProperty(key = "log.cleanup.policy", value = "compact"),
241-
@ClusterConfigProperty(key = "log.segment.bytes", value = "100"),
242242
@ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", value = "1048577")
243243
})
244244
public void testDeleteTopicWithCleaner(ClusterInstance cluster) throws Exception {
@@ -251,6 +251,8 @@ public void testDeleteTopicWithCleaner(ClusterInstance cluster) throws Exception
251251
"Replicas for topic test not created.");
252252
UnifiedLog log = server.logManager().getLog(topicPartition, false).get();
253253
writeDups(100, 3, log);
254+
// force roll the segment so that cleaner can work on it
255+
server.logManager().getLog(topicPartition, false).get().roll(Option.empty());
254256
// wait for cleaner to clean
255257
server.logManager().cleaner().awaitCleaned(topicPartition, 0, 60000);
256258
admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();

core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala

+5-4
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
128128
props.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "requested")
129129
props.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN")
130130
props.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(","))
131-
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "2000") // low value to test log rolling on config update
131+
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576") // low value to test log rolling on config update
132132
props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2") // greater than one to test reducing threads
133133
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 1680000000.toString)
134134
props.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, 168.toString)
@@ -587,7 +587,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
587587
val props = new Properties
588588
val logIndexSizeMaxBytes = "100000"
589589
val logRetentionMs = TimeUnit.DAYS.toMillis(1)
590-
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000")
590+
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576")
591591
props.put(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, TimeUnit.HOURS.toMillis(2).toString)
592592
props.put(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, TimeUnit.HOURS.toMillis(1).toString)
593593
props.put(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG, logIndexSizeMaxBytes)
@@ -609,11 +609,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
609609
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.toString)
610610
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, "1000")
611611
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, "1000")
612-
reconfigureServers(props, perBrokerConfig = false, (ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000"))
612+
reconfigureServers(props, perBrokerConfig = false, (ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576"))
613613

614614
// Verify that all broker defaults have been updated
615615
servers.foreach { server =>
616616
props.forEach { (k, v) =>
617+
TestUtils.waitUntilTrue(() => server.config.originals.get(k) != null, "Configs not present")
617618
assertEquals(server.config.originals.get(k).toString, v, s"Not reconfigured $k")
618619
}
619620
}
@@ -624,7 +625,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
624625
"Config not updated in LogManager")
625626

626627
val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found"))
627-
TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing topic config using defaults not updated")
628+
TestUtils.waitUntilTrue(() => log.config.segmentSize == 1048576, "Existing topic config using defaults not updated")
628629
val KafkaConfigToLogConfigName: Map[String, String] =
629630
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
630631
props.asScala.foreach { case (k, v) =>

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -1118,7 +1118,7 @@ class KafkaConfigTest {
11181118
case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG =>
11191119
assertDynamic(kafkaConfigProp, "5", () => config.zstdCompressionLevel)
11201120
case TopicConfig.SEGMENT_BYTES_CONFIG =>
1121-
assertDynamic(kafkaConfigProp, 10000, () => config.logSegmentBytes)
1121+
assertDynamic(kafkaConfigProp, 1048576, () => config.logSegmentBytes)
11221122
case TopicConfig.SEGMENT_MS_CONFIG =>
11231123
assertDynamic(kafkaConfigProp, 10001L, () => config.logRollTimeMillis)
11241124
case TopicConfig.DELETE_RETENTION_MS_CONFIG =>

core/src/test/scala/unit/kafka/server/LogOffsetTest.scala

-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ class LogOffsetTest extends BaseRequestTest {
5252
props.put("num.partitions", "20")
5353
props.put("log.retention.hours", "10")
5454
props.put("log.retention.check.interval.ms", (5 * 1000 * 60).toString)
55-
props.put("log.segment.bytes", "140")
5655
}
5756

5857
@ParameterizedTest

docs/upgrade.html

+4
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ <h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable changes in 4
111111
The <code>remote.log.manager.thread.pool.size</code> configuration default value was changed to 2 from 10.
112112
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
113113
</li>
114+
<li>
115+
The minimum <code>segment.bytes/log.segment.bytes</code> has changed from 14 bytes to 1MB.
116+
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
117+
</li>
114118
</ul>
115119
</li>
116120
<li><b>MirrorMaker</b>

storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public Optional<String> serverConfigName(String configName) {
159159
.define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT, ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.NUM_PARTITIONS_DOC)
160160
.define(ServerLogConfigs.LOG_DIR_CONFIG, STRING, ServerLogConfigs.LOG_DIR_DEFAULT, HIGH, ServerLogConfigs.LOG_DIR_DOC)
161161
.define(ServerLogConfigs.LOG_DIRS_CONFIG, STRING, null, HIGH, ServerLogConfigs.LOG_DIRS_DOC)
162-
.define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), HIGH, ServerLogConfigs.LOG_SEGMENT_BYTES_DOC)
162+
.define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(1024 * 1024), HIGH, ServerLogConfigs.LOG_SEGMENT_BYTES_DOC)
163163

164164
.define(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, LONG, null, HIGH, ServerLogConfigs.LOG_ROLL_TIME_MILLIS_DOC)
165165
.define(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG, INT, (int) TimeUnit.MILLISECONDS.toHours(DEFAULT_SEGMENT_MS), atLeast(1), HIGH, ServerLogConfigs.LOG_ROLL_TIME_HOURS_DOC)

0 commit comments

Comments
 (0)