From 2629090b1459e91d8b7ce42f8f00a8079a98996d Mon Sep 17 00:00:00 2001 From: Subhash Iyer Date: Tue, 22 Oct 2024 23:11:24 +0530 Subject: [PATCH 1/5] Add configs to sleep and sleep before commit --- .../connect/s3/S3SinkConnectorConfig.java | 46 ++++++++++++++++++- .../connect/s3/TopicPartitionWriter.java | 14 ++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index ce8139b1b..37bd8282f 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -200,7 +201,7 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { public static final Class HEADERS_FORMAT_CLASS_DEFAULT = AvroFormat.class; /** - * Elastic buffer to save memory. {@link io.confluent.connect.s3.storage.S3OutputStream#buffer} + * Elastic buffer to save memory. {@link io.confluent.connect.s3.storage.S3OutputStream} */ public static final String ELASTIC_BUFFER_ENABLE = "s3.elastic.buffer.enable"; @@ -223,6 +224,10 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { + "to prefix or suffix in the s3 path after the topic name." + " None will not append the schema name in the s3 path."; + + private static final String SHOULD_SLEEP_ON_COMMIT = "sleep.on.commit"; + private static final String SLEEP_INTERVAL = "sleep.interval"; + private static final GenericRecommender SCHEMA_PARTITION_AFFIX_TYPE_RECOMMENDER = new GenericRecommender(); @@ -831,6 +836,8 @@ public static ConfigDef newConfigDef() { "Elastic buffer initial capacity" ); + addZombieConfigs(configDef); + } return configDef; } @@ -869,6 +876,34 @@ private void validateTimezone() { } } + private static void addZombieConfigs(ConfigDef config) { + String group = "zombie"; + int orderInGroup = 0; + config.define( + SLEEP_INTERVAL, + Type.LONG, + -1, + Importance.HIGH, + "Sleep interval millis", + group, + ++orderInGroup, + Width.LONG, + "Sleep interval millis" + ); + + config.define( + SHOULD_SLEEP_ON_COMMIT, + Type.BOOLEAN, + false, + Importance.HIGH, + "Should sleep on commit", + group, + ++orderInGroup, + Width.LONG, + "Should sleep on commit" + ); + } + private void addToGlobal(AbstractConfig config) { allConfigs.add(config); addConfig(config.values(), (ComposableConfig) config); @@ -1013,6 +1048,15 @@ public boolean getElasticBufferEnable() { return getBoolean(ELASTIC_BUFFER_ENABLE); } + private Random random = new Random(); + public boolean getShouldSleep() { + return getBoolean(SHOULD_SLEEP_ON_COMMIT) && random.nextInt() % 2 == 0; + } + + public long sleepDuartion() { + return getLong(SLEEP_INTERVAL); + } + public int getElasticBufferInitCap() { return getInt(ELASTIC_BUFFER_INIT_CAPACITY); } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java index d632d2b68..2c8e36e80 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java @@ -665,6 +665,7 @@ private void commitFile(String encodedPartition) { } if (writers.containsKey(encodedPartition)) { + sleepIfRequired(); RecordWriter writer = writers.get(encodedPartition); // Commits the file and closes the underlying output stream. writer.commit(); @@ -673,6 +674,19 @@ private void commitFile(String encodedPartition) { } } + private void sleepIfRequired() { + boolean shouldSleep = connectorConfig.getShouldSleep(); + long sleepDuration = connectorConfig.sleepDuartion(); + if (shouldSleep) { + try { + log.info("Sleeping for {}ms", sleepDuration); + Thread.sleep(sleepDuration); + } catch (InterruptedException e) { + log.error("Interrupted in sleep", e); + } + } + } + private void tagFile(String encodedPartition, String s3ObjectPath) { Long startOffset = startOffsets.get(encodedPartition); Long endOffset = endOffsets.get(encodedPartition); From de418345663f24f71c122e9600e28376f223a62d Mon Sep 17 00:00:00 2001 From: Subhash Iyer Date: Wed, 23 Oct 2024 12:31:45 +0530 Subject: [PATCH 2/5] Add sleep probability --- .../connect/s3/S3SinkConnectorConfig.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index 37bd8282f..fa1529a19 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -227,6 +227,7 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { private static final String SHOULD_SLEEP_ON_COMMIT = "sleep.on.commit"; private static final String SLEEP_INTERVAL = "sleep.interval"; + private static final String SLEEP_PROBABILITY = "sleep.probability"; private static final GenericRecommender SCHEMA_PARTITION_AFFIX_TYPE_RECOMMENDER = new GenericRecommender(); @@ -902,6 +903,18 @@ private static void addZombieConfigs(ConfigDef config) { Width.LONG, "Should sleep on commit" ); + + config.define( + SLEEP_PROBABILITY, + Type.INT, + 2, + Importance.HIGH, + "Probability index", + group, + ++orderInGroup, + Width.LONG, + "Probability index" + ); } private void addToGlobal(AbstractConfig config) { @@ -1049,8 +1062,9 @@ public boolean getElasticBufferEnable() { } private Random random = new Random(); + public boolean getShouldSleep() { - return getBoolean(SHOULD_SLEEP_ON_COMMIT) && random.nextInt() % 2 == 0; + return getBoolean(SHOULD_SLEEP_ON_COMMIT) && random.nextInt() % getInt(SLEEP_PROBABILITY) == 0; } public long sleepDuartion() { From faf255fa9162726336ba9d8f75f0f6418e6f3c01 Mon Sep 17 00:00:00 2001 From: Subhash Iyer Date: Wed, 23 Oct 2024 12:57:37 +0530 Subject: [PATCH 3/5] Suppress warnings --- checkstyle/suppressions.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 6d1c521d7..f2d13abd9 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -21,4 +21,9 @@ files="(TopicPartitionWriter).java" /> + + From 5a1b5ea4a629bd097476c8544a4a0981344bf04d Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 7 Mar 2025 12:21:20 +0530 Subject: [PATCH 4/5] remove conflict --- .../confluent/connect/s3/TopicPartitionWriter.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java index 2c8e36e80..6ac54f5e1 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java @@ -665,7 +665,6 @@ private void commitFile(String encodedPartition) { } if (writers.containsKey(encodedPartition)) { - sleepIfRequired(); RecordWriter writer = writers.get(encodedPartition); // Commits the file and closes the underlying output stream. writer.commit(); @@ -674,18 +673,6 @@ private void commitFile(String encodedPartition) { } } - private void sleepIfRequired() { - boolean shouldSleep = connectorConfig.getShouldSleep(); - long sleepDuration = connectorConfig.sleepDuartion(); - if (shouldSleep) { - try { - log.info("Sleeping for {}ms", sleepDuration); - Thread.sleep(sleepDuration); - } catch (InterruptedException e) { - log.error("Interrupted in sleep", e); - } - } - } private void tagFile(String encodedPartition, String s3ObjectPath) { Long startOffset = startOffsets.get(encodedPartition); From 298292d5b8b8bfaf3e7d3c3a3c63121172308aa6 Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 7 Mar 2025 12:23:42 +0530 Subject: [PATCH 5/5] remove conflict --- .../main/java/io/confluent/connect/s3/TopicPartitionWriter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java index 6ac54f5e1..d632d2b68 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java @@ -673,7 +673,6 @@ private void commitFile(String encodedPartition) { } } - private void tagFile(String encodedPartition, String s3ObjectPath) { Long startOffset = startOffsets.get(encodedPartition); Long endOffset = endOffsets.get(encodedPartition);