Skip to content

Commit f2dcf36

Browse files
jayteejmanoj-mathivanan
authored andcommitted
KAFKA-16368: Update defaults for LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT and NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG (apache#18106)
Reviewers: Divij Vaidya <[email protected]>
1 parent 98e0713 commit f2dcf36

File tree

8 files changed

+34
-8
lines changed

8 files changed

+34
-8
lines changed

core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -1917,7 +1917,10 @@ class LogCleanerTest extends Logging {
19171917

19181918
@Test
19191919
def testCleanTombstone(): Unit = {
1920-
val logConfig = new LogConfig(new Properties())
1920+
val properties = new Properties()
1921+
// This test uses future timestamps beyond the default of 1 hour.
1922+
properties.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.MaxValue.toString)
1923+
val logConfig = new LogConfig(properties)
19211924

19221925
val log = makeLog(config = logConfig)
19231926
val cleaner = makeCleaner(10)

core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala

+2
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ class DumpLogSegmentsTest {
8888
private def createTestLog = {
8989
val props = new Properties
9090
props.setProperty(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "128")
91+
// This test uses future timestamps beyond the default of 1 hour.
92+
props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, Long.MaxValue.toString)
9193
log = UnifiedLog(
9294
dir = logDir,
9395
config = new LogConfig(props),

docs/upgrade.html

+9
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,15 @@ <h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable changes in 4
328328
KIP-714 is now enabled for Kafka Streams via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1076%3A++Metrics+for+client+applications+KIP-714+extension">KIP-1076</a>.
329329
This allows to not only collect the metric of the internally used clients of a Kafka Streams appliction via a broker-side plugin,
330330
but also to collect the <a href="/{{version}}/documentation/#kafka_streams_monitoring">metrics</a> of the Kafka Streams runtime itself.
331+
</li>
332+
The default value of 'num.recovery.threads.per.data.dir' has been changed from 1 to 2. The impact of this is faster
333+
recovery post unclean shutdown at the expense of extra IO cycles.
334+
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
335+
</li>
336+
<li>
337+
The default value of 'message.timestamp.after.max.ms' has been changed from Long.Max to 1 hour. The impact of this messages with a
338+
timestamp of more than 1 hour in the future will be rejected when message.timestamp.type=CreateTime is set.
339+
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
331340
</li>
332341
</ul>
333342
</li>

server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,15 @@ public class ServerLogConfigs {
118118
"If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " +
119119
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
120120
public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
121-
public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = Long.MAX_VALUE;
121+
public static final long LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT = 3600000; // 1 hour
122122
public static final String LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC = "This configuration sets the allowable timestamp difference between the " +
123123
"message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's " +
124124
"timestamp, with the maximum allowable difference determined by the value set in this configuration. " +
125125
"If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " +
126126
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
127127

128128
public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG = "num.recovery.threads.per.data.dir";
129-
public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 1;
129+
public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT = 2;
130130
public static final String NUM_RECOVERY_THREADS_PER_DATA_DIR_DOC = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown";
131131

132132
public static final String AUTO_CREATE_TOPICS_ENABLE_CONFIG = "auto.create.topics.enable";

storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.Collections;
2424
import java.util.List;
2525
import java.util.Map;
26-
import java.util.concurrent.TimeUnit;
2726

2827
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
2928

@@ -56,7 +55,7 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
5655
.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2"))
5756
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
5857
.produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
59-
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1)))
58+
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis()))
6059
// update the topic config such that it triggers the deletion of segments
6160
.updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList())
6261
// expect that the three offloaded remote log segments are deleted

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.Properties;
5959
import java.util.regex.Pattern;
6060

61+
import static org.apache.kafka.common.utils.Utils.mkProperties;
6162
import static org.apache.kafka.test.TestUtils.waitForCondition;
6263
import static org.hamcrest.CoreMatchers.equalTo;
6364
import static org.hamcrest.CoreMatchers.is;
@@ -76,7 +77,10 @@ public class FineGrainedAutoResetIntegrationTest {
7677
private static final String OUTPUT_TOPIC_4 = "outputTopic_4";
7778
private static final String OUTPUT_TOPIC_5 = "outputTopic_5";
7879

79-
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
80+
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
81+
NUM_BROKERS,
82+
mkProperties(
83+
Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
8084

8185
@BeforeAll
8286
public static void startCluster() throws IOException, InterruptedException {

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import static java.time.Duration.ofMillis;
9191
import static java.time.Duration.ofMinutes;
9292
import static java.time.Instant.ofEpochMilli;
93+
import static org.apache.kafka.common.utils.Utils.mkProperties;
9394
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
9495
import static org.hamcrest.CoreMatchers.equalTo;
9596
import static org.hamcrest.MatcherAssert.assertThat;
@@ -102,7 +103,10 @@
102103
public class KStreamAggregationIntegrationTest {
103104
private static final int NUM_BROKERS = 1;
104105

105-
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
106+
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
107+
NUM_BROKERS,
108+
mkProperties(
109+
Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
106110

107111
@BeforeAll
108112
public static void startCluster() throws Exception {

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,23 @@
3535
import java.io.IOException;
3636
import java.time.Duration;
3737
import java.util.ArrayList;
38+
import java.util.Collections;
3839
import java.util.Map;
3940
import java.util.Properties;
4041
import java.util.Set;
4142

43+
import static org.apache.kafka.common.utils.Utils.mkProperties;
4244
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
4345
import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
4446
import static org.junit.jupiter.api.Assertions.assertTrue;
4547

4648
@Timeout(600)
4749
@Tag("integration")
4850
public class SmokeTestDriverIntegrationTest {
49-
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
51+
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
52+
3,
53+
mkProperties(
54+
Collections.singletonMap("log.message.timestamp.after.max.ms", String.valueOf(Long.MAX_VALUE))));
5055

5156
@BeforeAll
5257
public static void startCluster() throws IOException {

0 commit comments

Comments
 (0)