diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7ed9f32cb..50721ea09 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,3 +1,5 @@ +# Contributing + In order for us to consider merging a contribution, you will need to sign our **C**ontributor **L**icense **A**greement. @@ -5,3 +7,182 @@ In order for us to consider merging a contribution, you will need to sign our > [Wikipedia](http://en.wikipedia.org/wiki/Contributor_License_Agreement) You can read and sign our full Contributor License Agreement [here](http://clabot.confluent.io/cla). + +## Reporting Bugs and Issues + +Report bugs and issues by creating a new GitHub issue. Prior to creating an issue, please search +through existing issues so that you are not creating duplicate ones. If a pull request exists that +corresponds to the issue, mention this pull request on the GitHub issue. + +## Guidelines for Contributing Code, Examples, Documentation + +Code changes are submitted via a pull request (PR). When submitting a PR use the following +guidelines: + +* Follow the style guide below +* Add/update documentation appropriately for the change you are making. +* Non-trivial changes should include unit tests covering the new functionality and potentially integration tests. +* Bug fixes should include unit tests and/or integration tests proving the issue is fixed. +* Try to keep pull requests short and submit separate ones for unrelated features. +* Keep formatting changes in separate commits to make code reviews easier and distinguish them from actual code changes. + +### Code Style +This connector is using a coding style that generally follows the [Google Java coding standard guide](https://google.github.io/styleguide/javaguide.html). + +Some conventions worth mentioning are: + +* Indentation (single tab) is 2 spaces. +* All import statements are listed explicitly. The wildcard (*) is not used in imports. +* Imports are groups as follows: +``` + import all packages not listed below (all other imports) + + import all javax.* packages + import all java.* packages + + import all io.confluent.* packages + + import static packages +``` +* Javadoc is highly recommended and often required during reviews in interfaces and public or protected classes and methods. + +### Titles and changelogs + +The title of a pull request is used as an entry on the release notes (aka changelogs) of the +connector in every release. + +For this reason, please use a brief but descriptive title for your pull request. If GitHub shortens +your pull request title when you issue the pull request adding the excessive part to the pull +request description, make sure that you correct the title before or after you issue the pull +request. + +If the fix is a minor fix you are encouraged to use the tag `MINOR:` followed by your pull request +title. You may link the corresponding issue to the description of your pull request but adding it to +the title will not be useful during changelog generation. + +When reverting a previous commit, use the prefix `Revert ` on the pull request title (automatically +added by GitHub when a pull request is created to revert an existing commit). + +### Tests +Every pull request should contain a sufficient amount of tests that assess your suggested code +changes. It’s highly recommended that you also check the code coverage of the production code you +are adding to make sure that your changes are covered sufficiently by the test code. + +### Description +Including a good description when you issue your pull requests helps significantly with reviews. +Feel free to follow the template that is when issuing a pull request and mention how your changes +are tested. + +### Backporting Commits +If your code changes are essentially bug fixes that make sense to backport to existing releases make sure to target the earliest release branch (e.g. 2.0.x) that should contain your changes. When selecting the release branch you should also consider how easy it will be to resolve any conflicts in newer release branches, including the `master` branch. + +## Github Workflow + +1. Fork the connector repository into your GitHub account: https://github.com/confluentinc/kafka-connect-storage-cloud/fork + +2. Clone your fork of the GitHub repository, replacing `` with your GitHub username. + + Use ssh (recommended): + + ```bash + git clone git@github.com:/kafka-connect-storage-cloud.git + ``` + + Or https: + + ```bash + git clone https://github.com//kafka-connect-storage-cloud.git + ``` + +3. Add a remote to keep up with upstream changes. + + ```bash + git remote add upstream https://github.com/confluentinc/kafka-connect-storage-cloud.git + ``` + + If you already have a copy, fetch upstream changes. + + ```bash + git fetch upstream + ``` + + or + + ```bash + git remote update + ``` + +4. Create a feature branch to work in. + + ```bash + git checkout -b feature-xyz upstream/master + ``` + +5. Work in your feature branch. + + ```bash + git commit -a --verbose + ``` + +6. Periodically rebase your changes + + ```bash + git pull --rebase + ``` + +7. When done, combine ("squash") related commits into a single one + + ```bash + git rebase -i upstream/master + ``` + + This will open your editor and allow you to re-order commits and merge them: + - Re-order the lines to change commit order (to the extent possible without creating conflicts) + - Prefix commits using `s` (squash) or `f` (fixup) to merge extraneous commits. + +8. Submit a pull-request + + ```bash + git push origin feature-xyz + ``` + + Go to your fork main page + + ```bash + https://github.com//kafka-connect-storage-cloud.git + ``` + + If you recently pushed your changes GitHub will automatically pop up a `Compare & pull request` + button for any branches you recently pushed to. If you click that button it will automatically + offer you to submit your pull-request to the `confluentinc` connector repository. + + - Give your pull-request a meaningful title as described [above](#titles-and-changelogs). + - In the description, explain your changes and the problem they are solving. + +9. Addressing code review comments + + Repeat steps 5. through 7. to address any code review comments and rebase your changes if necessary. + + Push your updated changes to update the pull request + + ```bash + git push origin [--force] feature-xyz + ``` + + `--force` may be necessary to overwrite your existing pull request in case your + commit history was changed when performing the rebase. + + Note: Be careful when using `--force` since you may lose data if you are not careful. + + ```bash + git push origin --force feature-xyz + ``` + +## Useful Resources for Developers + +1. Connector Developer Guide: https://docs.confluent.io/platform/current/connect/devguide.html +2. A Guide to the Confluent Verified Integrations Program: https://www.confluent.io/blog/guide-to-confluent-verified-integrations-program/ +3. Verification Guide for Confluent Platform Integrations: https://cdn.confluent.io/wp-content/uploads/Verification-Guide-Confluent-Platform-Connectors-Integrations.pdf +4. From Zero to Hero with Kafka Connect: https://www.confluent.io/kafka-summit-lon19/from-zero-to-hero-with-kafka-connect/ +5. 4 Steps to Creating Apache Kafka Connectors with the Kafka Connect API: https://www.confluent.io/blog/create-dynamic-kafka-connect-source-connectors/ +6. How to Write a Connector for Kafka Connect – Deep Dive into Configuration Handling: https://www.confluent.io/blog/write-a-kafka-connect-connector-with-configuration-handling/ diff --git a/README.md b/README.md index 2f1a89de8..ae6c2a413 100644 --- a/README.md +++ b/README.md @@ -20,11 +20,20 @@ for guidance on this process. You can build *kafka-connect-storage-cloud* with Maven using the standard lifecycle phases. +# Running Integration Tests +Integration tests are run as part of `mvn install`; however one needs to first configure the environment variable`AWS_CREDENTIALS_PATH` to point to a json file path with following structure: +``` +{ + "aws_access_key_id": "", + "aws_secret_access_key": "" +} +``` # Contribute - Source Code: https://github.com/confluentinc/kafka-connect-storage-cloud - Issue Tracker: https://github.com/confluentinc/kafka-connect-storage-cloud/issues +- Learn how to work with the connector's source code by reading our [Development and Contribution guidelines](CONTRIBUTING.md). # License diff --git a/kafka-connect-s3/pom.xml b/kafka-connect-s3/pom.xml index 587576ce4..0e541227c 100644 --- a/kafka-connect-s3/pom.xml +++ b/kafka-connect-s3/pom.xml @@ -21,7 +21,7 @@ io.confluent kafka-connect-storage-cloud - 10.0.28-SNAPSHOT + 10.5.8-SNAPSHOT kafka-connect-s3 diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index 14c35cc63..a537e73f4 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -17,16 +17,20 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.SSEAlgorithm; +import io.confluent.connect.storage.common.util.StringUtils; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.types.Password; @@ -81,6 +85,11 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { public static final String S3_OBJECT_TAGGING_CONFIG = "s3.object.tagging"; public static final boolean S3_OBJECT_TAGGING_DEFAULT = false; + public static final String S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG = + "s3.object.behavior.on.tagging.error"; + public static final String S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_DEFAULT = + IgnoreOrFailBehavior.IGNORE.toString(); + public static final String SSEA_CONFIG = "s3.ssea.name"; public static final String SSEA_DEFAULT = ""; @@ -153,7 +162,7 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { ClientConfiguration.DEFAULT_USE_EXPECT_CONTINUE; public static final String BEHAVIOR_ON_NULL_VALUES_CONFIG = "behavior.on.null.values"; - public static final String BEHAVIOR_ON_NULL_VALUES_DEFAULT = BehaviorOnNullValues.FAIL.toString(); + public static final String BEHAVIOR_ON_NULL_VALUES_DEFAULT = OutputWriteBehavior.FAIL.toString(); /** * Maximum back-off time when retrying failed requests. @@ -180,6 +189,33 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { public static final String HEADERS_FORMAT_CLASS_CONFIG = "headers.format.class"; public static final Class HEADERS_FORMAT_CLASS_DEFAULT = AvroFormat.class; + /** + * Elastic buffer to save memory. {@link io.confluent.connect.s3.storage.S3OutputStream#buffer} + */ + + public static final String ELASTIC_BUFFER_ENABLE = "s3.elastic.buffer.enable"; + public static final boolean ELASTIC_BUFFER_ENABLE_DEFAULT = false; + + public static final String ELASTIC_BUFFER_INIT_CAPACITY = "s3.elastic.buffer.init.capacity"; + public static final int ELASTIC_BUFFER_INIT_CAPACITY_DEFAULT = 128 * 1024; // 128KB + + public static final String TOMBSTONE_ENCODED_PARTITION = "tombstone.encoded.partition"; + public static final String TOMBSTONE_ENCODED_PARTITION_DEFAULT = "tombstone"; + + /** + * Append schema name in s3-path + */ + + public static final String SCHEMA_PARTITION_AFFIX_TYPE_CONFIG = + "s3.schema.partition.affix.type"; + public static final String SCHEMA_PARTITION_AFFIX_TYPE_DEFAULT = AffixType.NONE.name(); + public static final String SCHEMA_PARTITION_AFFIX_TYPE_DOC = "Append the record schema name " + + "to prefix or suffix in the s3 path after the topic name." + + " None will not append the schema name in the s3 path."; + + private static final GenericRecommender SCHEMA_PARTITION_AFFIX_TYPE_RECOMMENDER = + new GenericRecommender(); + private final String name; private final Map propertyToConfig = new HashMap<>(); @@ -231,6 +267,9 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { FieldPartitioner.class ) ); + + SCHEMA_PARTITION_AFFIX_TYPE_RECOMMENDER.addValidValues( + Arrays.stream(AffixType.names()).collect(Collectors.toList())); } public static ConfigDef newConfigDef() { @@ -279,6 +318,19 @@ public static ConfigDef newConfigDef() { "S3 Object Tagging" ); + configDef.define( + S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG, + Type.STRING, + S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_DEFAULT, + IgnoreOrFailBehavior.VALIDATOR, + Importance.LOW, + "How to handle S3 object tagging error. Valid options are 'ignore' and 'fail'.", + group, + ++orderInGroup, + Width.SHORT, + "Behavior for S3 object tagging error" + ); + configDef.define( REGION_CONFIG, Type.STRING, @@ -603,15 +655,51 @@ public static ConfigDef newConfigDef() { BEHAVIOR_ON_NULL_VALUES_CONFIG, Type.STRING, BEHAVIOR_ON_NULL_VALUES_DEFAULT, - BehaviorOnNullValues.VALIDATOR, + OutputWriteBehavior.VALIDATOR, Importance.LOW, "How to handle records with a null value (i.e. Kafka tombstone records)." - + " Valid options are 'ignore' and 'fail'.", + + " Valid options are 'ignore', 'fail' and 'write'." + + " Ignore would skip the tombstone record and fail would cause the connector task to" + + " throw an exception." + + " In case of the write tombstone option, the connector redirects tombstone records" + + " to a separate directory mentioned in the config tombstone.encoded.partition." + + " The storage of Kafka record keys is mandatory when this option is selected and" + + " the file for values is not generated for tombstone records.", group, ++orderInGroup, Width.SHORT, "Behavior for null-valued records" ); + + // This is done to avoid aggressive schema based rotations resulting out of interleaving + // of tombstones with regular records. + configDef.define( + TOMBSTONE_ENCODED_PARTITION, + Type.STRING, + TOMBSTONE_ENCODED_PARTITION_DEFAULT, + Importance.LOW, + "Output s3 folder to write the tombstone records to. The configured" + + " partitioner would map tombstone records to this output folder.", + group, + ++orderInGroup, + Width.SHORT, + "Tombstone Encoded Partition" + ); + + + configDef.define( + SCHEMA_PARTITION_AFFIX_TYPE_CONFIG, + Type.STRING, + SCHEMA_PARTITION_AFFIX_TYPE_DEFAULT, + ConfigDef.ValidString.in(AffixType.names()), + Importance.LOW, + SCHEMA_PARTITION_AFFIX_TYPE_DOC, + group, + ++orderInGroup, + Width.LONG, + "Schema Partition Affix Type", + SCHEMA_PARTITION_AFFIX_TYPE_RECOMMENDER + ); } { @@ -623,7 +711,8 @@ public static ConfigDef newConfigDef() { Type.BOOLEAN, false, Importance.LOW, - "Enable or disable writing keys to storage.", + "Enable or disable writing keys to storage. " + + "This config is mandatory when the writing of tombstone records is enabled.", group, ++orderInGroup, Width.SHORT, @@ -682,6 +771,32 @@ public static ConfigDef newConfigDef() { Width.SHORT, "Enable Path Style Access to S3" ); + configDef.define( + ELASTIC_BUFFER_ENABLE, + Type.BOOLEAN, + ELASTIC_BUFFER_ENABLE_DEFAULT, + Importance.LOW, + "Specifies whether or not to allocate elastic buffer for staging s3-part to save memory." + + " Note that this may cause decreased performance or increased CPU usage", + group, + ++orderInGroup, + Width.LONG, + "Enable elastic buffer to staging s3-part" + ); + + configDef.define( + ELASTIC_BUFFER_INIT_CAPACITY, + Type.INT, + ELASTIC_BUFFER_INIT_CAPACITY_DEFAULT, + atLeast(4096), + Importance.LOW, + "Elastic buffer initial capacity.", + group, + ++orderInGroup, + Width.LONG, + "Elastic buffer initial capacity" + ); + } return configDef; } @@ -755,6 +870,14 @@ public CannedAccessControlList getCannedAcl() { return CannedAclValidator.ACLS_BY_HEADER_VALUE.get(getString(ACL_CANNED_CONFIG)); } + public String awsAccessKeyId() { + return getString(AWS_ACCESS_KEY_ID_CONFIG); + } + + public Password awsSecretKeyId() { + return getPassword(AWS_SECRET_ACCESS_KEY_CONFIG); + } + public int getPartSize() { return getInt(PART_SIZE_CONFIG); } @@ -770,7 +893,18 @@ public AWSCredentialsProvider getCredentialsProvider() { configs.remove(CREDENTIALS_PROVIDER_CLASS_CONFIG.substring( CREDENTIALS_PROVIDER_CONFIG_PREFIX.length() )); + + configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId()); + configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value()); + ((Configurable) provider).configure(configs); + } else { + final String accessKeyId = awsAccessKeyId(); + final String secretKey = awsSecretKeyId().value(); + if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) { + BasicAWSCredentials basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey); + provider = new AWSStaticCredentialsProvider(basicCredentials); + } } return provider; @@ -837,6 +971,22 @@ public String getFormatByteArrayLineSeparator() { return FORMAT_BYTEARRAY_LINE_SEPARATOR_DEFAULT; } + public boolean getElasticBufferEnable() { + return getBoolean(ELASTIC_BUFFER_ENABLE); + } + + public int getElasticBufferInitCap() { + return getInt(ELASTIC_BUFFER_INIT_CAPACITY); + } + + public boolean isTombstoneWriteEnabled() { + return OutputWriteBehavior.WRITE.toString().equalsIgnoreCase(nullValueBehavior()); + } + + public String getTombstoneEncodedPartition() { + return getString(TOMBSTONE_ENCODED_PARTITION); + } + protected static String parseName(Map props) { String nameProp = props.get("name"); return nameProp != null ? nameProp : "S3-sink"; @@ -863,6 +1013,10 @@ public Object get(String key) { return map; } + public AffixType getSchemaPartitionAffixType() { + return AffixType.valueOf(getString(SCHEMA_PARTITION_AFFIX_TYPE_CONFIG)); + } + private static class PartRange implements ConfigDef.Validator { // S3 specific limit final int min = 5 * 1024 * 1024; @@ -1122,31 +1276,38 @@ public String nullValueBehavior() { return getString(BEHAVIOR_ON_NULL_VALUES_CONFIG); } - public enum BehaviorOnNullValues { + public enum IgnoreOrFailBehavior { IGNORE, FAIL; - public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() { - private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(names()); + public static final ConfigDef.Validator VALIDATOR = new EnumValidator(names()); - @Override - public void ensureValid(String name, Object value) { - if (value instanceof String) { - value = ((String) value).toLowerCase(Locale.ROOT); - } - validator.ensureValid(name, value); - } + public static String[] names() { + IgnoreOrFailBehavior[] behaviors = values(); + String[] result = new String[behaviors.length]; - // Overridden here so that ConfigDef.toEnrichedRst shows possible values correctly - @Override - public String toString() { - return validator.toString(); + for (int i = 0; i < behaviors.length; i++) { + result[i] = behaviors[i].toString(); } - }; + return result; + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } + } + + public enum OutputWriteBehavior { + IGNORE, + FAIL, + WRITE; + + public static final ConfigDef.Validator VALIDATOR = new EnumValidator(names()); public static String[] names() { - BehaviorOnNullValues[] behaviors = values(); + OutputWriteBehavior[] behaviors = values(); String[] result = new String[behaviors.length]; for (int i = 0; i < behaviors.length; i++) { @@ -1162,6 +1323,39 @@ public String toString() { } } + private static class EnumValidator implements Validator { + + private final ConfigDef.ValidString validator; + + private EnumValidator(String[] validValues) { + this.validator = ConfigDef.ValidString.in(validValues); + } + + @Override + public void ensureValid(String name, Object value) { + if (value instanceof String) { + value = ((String) value).toLowerCase(Locale.ROOT); + } + validator.ensureValid(name, value); + } + + // Overridden here so that ConfigDef.toEnrichedRst shows possible values correctly + @Override + public String toString() { + return validator.toString(); + } + } + + public enum AffixType { + SUFFIX, + PREFIX, + NONE; + + public static String[] names() { + return Arrays.stream(values()).map(AffixType::name).toArray(String[]::new); + } + } + public static void main(String[] args) { System.out.println(getConfig().toEnrichedRst()); } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorValidator.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorValidator.java index 52a23859d..084598ccf 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorValidator.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorValidator.java @@ -36,6 +36,7 @@ import java.util.Objects; import java.util.Set; +import static io.confluent.connect.s3.S3SinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.COMPRESSION_TYPE_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.HEADERS_FORMAT_CLASS_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.KEYS_FORMAT_CLASS_CONFIG; @@ -82,7 +83,7 @@ public Config validate() { try { s3SinkConnectorConfig = new S3SinkConnectorConfig(config, connectorConfigs); } catch (ConfigException exception) { - log.trace("Configuration not ready for cross validation."); + log.error("Configuration not ready for cross validation.", exception); } if (s3SinkConnectorConfig != null) { validateCompression( @@ -90,6 +91,8 @@ public Config validate() { s3SinkConnectorConfig.storeKafkaKeys(), s3SinkConnectorConfig.keysFormatClass(), s3SinkConnectorConfig.storeKafkaHeaders(), s3SinkConnectorConfig.headersFormatClass() ); + validateTombstoneWriter(s3SinkConnectorConfig.isTombstoneWriteEnabled(), + s3SinkConnectorConfig.storeKafkaKeys()); } return new Config(new ArrayList<>(this.valuesByKey.values())); @@ -128,6 +131,15 @@ public void validateCompression(CompressionType compressionType, Class formatCla } } + public void validateTombstoneWriter(boolean isTombstoneWriteEnabled, boolean isStoreKeysEnabled) { + if (isTombstoneWriteEnabled && !isStoreKeysEnabled) { + recordErrors( + "Writing Kafka record keys to storage is mandatory when tombstone writing is" + + " enabled.", + STORE_KAFKA_KEYS_CONFIG, BEHAVIOR_ON_NULL_VALUES_CONFIG); + } + } + private void recordErrors(String message, String... keys) { log.error("Validation Failed with error: " + message); for (String key: keys) { diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java index 9cfd8be9b..8255f955d 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java @@ -16,7 +16,9 @@ package io.confluent.connect.s3; import com.amazonaws.AmazonClientException; -import io.confluent.connect.s3.S3SinkConnectorConfig.BehaviorOnNullValues; +import io.confluent.connect.s3.S3SinkConnectorConfig.OutputWriteBehavior; +import io.confluent.connect.s3.util.TombstoneSupportedPartitioner; +import io.confluent.connect.s3.util.SchemaPartitioner; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; @@ -114,7 +116,9 @@ public void start(Map props) { } writerProvider = newRecordWriterProvider(connectorConfig); + log.info("Created S3 sink record writer provider."); partitioner = newPartitioner(connectorConfig); + log.info("Created S3 sink partitioner."); open(context.assignment()); try { @@ -125,7 +129,7 @@ public void start(Map props) { } catch (NoSuchMethodError | NoClassDefFoundError | UnsupportedOperationException e) { // Will occur in Connect runtimes earlier than 2.6 log.warn("Connect versions prior to Apache Kafka 2.6 do not support " - + "the errant record reporter"); + + "the errant record reporter", e); } log.info("Started S3 connector task with assigned partitions: {}", @@ -149,6 +153,9 @@ public void open(Collection partitions) { for (TopicPartition tp : partitions) { topicPartitionWriters.put(tp, newTopicPartitionWriter(tp)); } + log.info("Assigned topic partitions: {}", + topicPartitionWriters.keySet() + ); } @SuppressWarnings("unchecked") @@ -204,8 +211,14 @@ private Partitioner newPartitioner(S3SinkConnectorConfig config) plainValues.put(originalKey, originals.get(originalKey)); } } + if (config.getSchemaPartitionAffixType() != S3SinkConnectorConfig.AffixType.NONE) { + partitioner = new SchemaPartitioner<>(partitioner); + } + if (config.isTombstoneWriteEnabled()) { + String tomebstonePartition = config.getTombstoneEncodedPartition(); + partitioner = new TombstoneSupportedPartitioner<>(partitioner, tomebstonePartition); + } partitioner.configure(plainValues); - return partitioner; } @@ -254,7 +267,7 @@ public void put(Collection records) throws ConnectException { private boolean maybeSkipOnNullValue(SinkRecord record) { if (record.value() == null) { if (connectorConfig.nullValueBehavior() - .equalsIgnoreCase(BehaviorOnNullValues.IGNORE.toString())) { + .equalsIgnoreCase(OutputWriteBehavior.IGNORE.toString())) { log.debug( "Null valued record from topic '{}', partition {} and offset {} was skipped.", record.topic(), @@ -262,7 +275,19 @@ private boolean maybeSkipOnNullValue(SinkRecord record) { record.kafkaOffset() ); return true; + } else if (connectorConfig.nullValueBehavior() + .equalsIgnoreCase(OutputWriteBehavior.WRITE.toString())) { + log.debug( + "Null valued record from topic '{}', partition {} and offset {} was written in the" + + "partition {}.", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset(), + connectorConfig.getTombstoneEncodedPartition() + ); + return false; } else { + // Fail throw new ConnectException("Null valued records are not writeable with current " + S3SinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG + " 'settings."); } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java index ee28b94de..cee677f4d 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java @@ -18,6 +18,8 @@ import com.amazonaws.SdkClientException; import io.confluent.connect.s3.storage.S3Storage; import io.confluent.connect.s3.util.FileRotationTracker; +import io.confluent.connect.s3.util.RetryUtil; +import io.confluent.connect.s3.util.TombstoneTimestampExtractor; import io.confluent.connect.storage.errors.PartitionException; import io.confluent.connect.storage.schema.SchemaCompatibilityResult; import org.apache.kafka.common.TopicPartition; @@ -55,6 +57,9 @@ import io.confluent.connect.storage.schema.StorageSchemaCompatibility; import io.confluent.connect.storage.util.DateTimeUtils; +import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_PART_RETRIES_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_RETRY_BACKOFF_CONFIG; + public class TopicPartitionWriter { private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class); @@ -65,12 +70,13 @@ public class TopicPartitionWriter { private final TopicPartition tp; private final S3Storage storage; private final Partitioner partitioner; - private final TimestampExtractor timestampExtractor; + private TimestampExtractor timestampExtractor; private String topicsDir; private State state; private final Queue buffer; private final SinkTaskContext context; private final boolean isTaggingEnabled; + private final boolean ignoreTaggingErrors; private int recordCount; private final int flushSize; private final long rotateIntervalMs; @@ -128,10 +134,19 @@ public TopicPartitionWriter(TopicPartition tp, this.writerProvider = writerProvider; this.partitioner = partitioner; this.reporter = reporter; - this.timestampExtractor = partitioner instanceof TimeBasedPartitioner - ? ((TimeBasedPartitioner) partitioner).getTimestampExtractor() - : null; + this.timestampExtractor = null; + + if (partitioner instanceof TimeBasedPartitioner) { + this.timestampExtractor = ((TimeBasedPartitioner) partitioner).getTimestampExtractor(); + if (connectorConfig.isTombstoneWriteEnabled()) { + this.timestampExtractor = new TombstoneTimestampExtractor(timestampExtractor); + } + } + isTaggingEnabled = connectorConfig.getBoolean(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG); + ignoreTaggingErrors = connectorConfig.getString( + S3SinkConnectorConfig.S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG) + .equalsIgnoreCase(S3SinkConnectorConfig.IgnoreOrFailBehavior.IGNORE.toString()); flushSize = connectorConfig.getInt(S3SinkConnectorConfig.FLUSH_SIZE_CONFIG); topicsDir = connectorConfig.getString(StorageCommonConfig.TOPICS_DIR_CONFIG); rotateIntervalMs = connectorConfig.getLong(S3SinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG); @@ -622,7 +637,11 @@ private void commitFiles() { String encodedPartition = entry.getKey(); commitFile(encodedPartition); if (isTaggingEnabled) { - tagFile(encodedPartition, entry.getValue()); + RetryUtil.exponentialBackoffRetry(() -> tagFile(encodedPartition, entry.getValue()), + ConnectException.class, + connectorConfig.getInt(S3_PART_RETRIES_CONFIG), + connectorConfig.getLong(S3_RETRY_BACKOFF_CONFIG) + ); } startOffsets.remove(encodedPartition); endOffsets.remove(encodedPartition); @@ -682,10 +701,18 @@ private void tagFile(String encodedPartition, String s3ObjectPath) { log.info("Tagged S3 object {} with starting offset {}, ending offset {}, record count {}", s3ObjectPath, startOffset, endOffset, recordCount); } catch (SdkClientException e) { - log.warn("Unable to tag S3 object {}. Ignoring.", s3ObjectPath, e); + if (ignoreTaggingErrors) { + log.warn("Unable to tag S3 object {}. Ignoring.", s3ObjectPath, e); + } else { + throw new ConnectException(String.format("Unable to tag S3 object %s", s3ObjectPath), e); + } } catch (Exception e) { - log.warn("Unrecoverable exception while attempting to tag S3 object {}. Ignoring.", - s3ObjectPath, e); + if (ignoreTaggingErrors) { + log.warn("Unrecoverable exception while attempting to tag S3 object {}. Ignoring.", + s3ObjectPath, e); + } else { + throw new ConnectException(String.format("Unable to tag S3 object %s", s3ObjectPath), e); + } } } } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java index 9547098c7..880a1045b 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java @@ -17,15 +17,21 @@ package io.confluent.connect.s3.auth; import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import io.confluent.connect.storage.common.util.StringUtils; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import java.util.Map; +import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG; + /** * AWS credentials provider that uses the AWS Security Token Service to assume a Role and create a * temporary, short-lived session to use for authentication. This credentials provider does not @@ -59,26 +65,53 @@ public class AwsAssumeRoleCredentialsProvider implements AWSCredentialsProvider, private String roleExternalId; private String roleSessionName; + private BasicAWSCredentials basicCredentials; + + // STSAssumeRoleSessionCredentialsProvider takes care of refreshing short-lived + // credentials 60 seconds before it's expiry + private STSAssumeRoleSessionCredentialsProvider stsCredentialProvider; + @Override public void configure(Map configs) { AbstractConfig config = new AbstractConfig(STS_CONFIG_DEF, configs); roleArn = config.getString(ROLE_ARN_CONFIG); roleExternalId = config.getString(ROLE_EXTERNAL_ID_CONFIG); roleSessionName = config.getString(ROLE_SESSION_NAME_CONFIG); + final String accessKeyId = (String) configs.get(AWS_ACCESS_KEY_ID_CONFIG); + final String secretKey = (String) configs.get(AWS_SECRET_ACCESS_KEY_CONFIG); + if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) { + basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey); + stsCredentialProvider = new STSAssumeRoleSessionCredentialsProvider + .Builder(roleArn, roleSessionName) + .withStsClient(AWSSecurityTokenServiceClientBuilder + .standard() + .withCredentials(new AWSStaticCredentialsProvider(basicCredentials)).build() + ) + .withExternalId(roleExternalId) + .build(); + } else { + basicCredentials = null; + stsCredentialProvider = new STSAssumeRoleSessionCredentialsProvider + .Builder(roleArn, roleSessionName) + // default sts client will internally use default credentials chain provider + // https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default + .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) + .withExternalId(roleExternalId) + .build(); + } } @Override public AWSCredentials getCredentials() { - return new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, roleSessionName) - .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) - .withExternalId(roleExternalId) - .build() - .getCredentials(); + return stsCredentialProvider.getCredentials(); } @Override public void refresh() { - // Nothing to do really, since we acquire a new session every getCredentials() call. + // performs a force refresh of credentials + if (stsCredentialProvider != null) { + stsCredentialProvider.refresh(); + } } } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/KeyValueHeaderRecordWriterProvider.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/KeyValueHeaderRecordWriterProvider.java index d5312f449..d6a30cca4 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/KeyValueHeaderRecordWriterProvider.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/KeyValueHeaderRecordWriterProvider.java @@ -17,20 +17,20 @@ package io.confluent.connect.s3.format; -import javax.annotation.Nullable; -import javax.validation.constraints.NotNull; -import org.apache.kafka.connect.errors.DataException; -import org.apache.kafka.connect.sink.SinkRecord; +import static io.confluent.connect.s3.util.Utils.sinkRecordToLoggableString; +import static java.util.Objects.requireNonNull; import io.confluent.connect.s3.S3SinkConnectorConfig; import io.confluent.connect.storage.format.RecordWriter; import io.confluent.connect.storage.format.RecordWriterProvider; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static io.confluent.connect.s3.util.Utils.sinkRecordToLoggableString; -import static java.util.Objects.requireNonNull; - /** * A class that adds a record writer layer to manage writing values, keys and headers * with a single call. It provides an abstraction for writing, committing and @@ -42,7 +42,7 @@ public class KeyValueHeaderRecordWriterProvider private static final Logger log = LoggerFactory.getLogger(KeyValueHeaderRecordWriterProvider.class); - @NotNull + @Nullable private final RecordWriterProvider valueProvider; @Nullable @@ -75,20 +75,26 @@ public RecordWriter getRecordWriter(S3SinkConnectorConfig conf, String filename) ? filename.substring(0, filename.length() - valueProvider.getExtension().length()) : filename; - RecordWriter valueWriter = valueProvider.getRecordWriter(conf, strippedFilename); - RecordWriter keyWriter = - keyProvider == null ? null : keyProvider.getRecordWriter(conf, strippedFilename); - RecordWriter headerWriter = - headerProvider == null ? null : headerProvider.getRecordWriter(conf, strippedFilename); + Optional valueWriter = filename.contains(conf.getTombstoneEncodedPartition()) + ? Optional.empty() : Optional.of(valueProvider.getRecordWriter(conf, strippedFilename)); + Optional keyWriter = Optional.ofNullable(keyProvider) + .map(keyProvider -> keyProvider.getRecordWriter(conf, strippedFilename)); + Optional headerWriter = Optional.ofNullable(headerProvider) + .map(headerProvider -> headerProvider.getRecordWriter(conf, strippedFilename)); return new RecordWriter() { @Override public void write(SinkRecord sinkRecord) { + if (conf.isTombstoneWriteEnabled() && !keyWriter.isPresent()) { + throw new ConnectException( + "Key Writer must be configured when writing tombstone records is enabled."); + } + // The two data exceptions below must be caught before writing the value // to avoid misaligned K/V/H files. // keyWriter != null means writing keys is turned on - if (keyWriter != null && sinkRecord.key() == null) { + if (keyWriter.isPresent() && sinkRecord.key() == null) { throw new DataException( String.format("Key cannot be null for SinkRecord: %s", sinkRecordToLoggableString(sinkRecord)) @@ -96,7 +102,7 @@ public void write(SinkRecord sinkRecord) { } // headerWriter != null means writing headers is turned on - if (headerWriter != null + if (headerWriter.isPresent() && (sinkRecord.headers() == null || sinkRecord.headers().isEmpty())) { throw new DataException( String.format("Headers cannot be null for SinkRecord: %s", @@ -104,35 +110,35 @@ public void write(SinkRecord sinkRecord) { ); } - valueWriter.write(sinkRecord); // null check happens in sink task - if (keyWriter != null) { - keyWriter.write(sinkRecord); - } - if (headerWriter != null) { - headerWriter.write(sinkRecord); + if (valueWriter.isPresent()) { + valueWriter.get().write(sinkRecord); + } else { + // Should only encounter tombstones here. + if (sinkRecord.value() != null) { + throw new ConnectException( + String.format("Value writer not configured for SinkRecord: %s." + + " fileName: %s, tombstonePartition: %s", + sinkRecordToLoggableString(sinkRecord), filename, + conf.getTombstoneEncodedPartition()) + ); + } } + keyWriter.ifPresent(writer -> writer.write(sinkRecord)); + headerWriter.ifPresent(writer -> writer.write(sinkRecord)); } @Override public void close() { - valueWriter.close(); - if (keyWriter != null) { - keyWriter.close(); - } - if (headerWriter != null) { - headerWriter.close(); - } + valueWriter.ifPresent(RecordWriter::close); + keyWriter.ifPresent(RecordWriter::close); + headerWriter.ifPresent(RecordWriter::close); } @Override public void commit() { - valueWriter.commit(); - if (keyWriter != null) { - keyWriter.commit(); - } - if (headerWriter != null) { - headerWriter.commit(); - } + valueWriter.ifPresent(RecordWriter::commit); + keyWriter.ifPresent(RecordWriter::commit); + headerWriter.ifPresent(RecordWriter::commit); } }; } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/RecordViews.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/RecordViews.java index c8968db3a..9168d2b3d 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/RecordViews.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/RecordViews.java @@ -22,11 +22,16 @@ import org.apache.kafka.connect.data.Values; import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.stream.Collectors; import java.util.stream.StreamSupport; public final class RecordViews { + private static final Logger log = LoggerFactory.getLogger(RecordViews.class); + public static final class ValueRecordView extends BaseRecordView { @Override public Schema getViewSchema(SinkRecord record, boolean enveloped) { @@ -55,6 +60,7 @@ public Schema getViewSchema(SinkRecord record, boolean enveloped) { keySchema = SchemaBuilder.struct().name(KEY_STRUCT_NAME) .field(KEY_FIELD_NAME, keySchema).build(); } + log.debug("Created key record view schema."); return keySchema; } @@ -90,6 +96,7 @@ public Schema getViewSchema(SinkRecord record, boolean enveloped) { headerSchema = SchemaBuilder.struct().name(HEADER_STRUCT_NAME) .field(HEADER_FIELD_NAME, headerSchema).build(); } + log.debug("Created header record view schema."); return headerSchema; } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/S3RetriableRecordWriter.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/S3RetriableRecordWriter.java index 6c61ac965..5d0ef4695 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/S3RetriableRecordWriter.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/S3RetriableRecordWriter.java @@ -20,6 +20,8 @@ import org.apache.kafka.connect.sink.SinkRecord; import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static io.confluent.connect.s3.util.S3ErrorUtils.throwConnectException; @@ -29,10 +31,13 @@ * as determined within `throwConnectException()`. */ public class S3RetriableRecordWriter implements RecordWriter { + + private static final Logger log = LoggerFactory.getLogger(S3RetriableRecordWriter.class); private final IORecordWriter writer; public S3RetriableRecordWriter(IORecordWriter writer) { if (writer == null) { + log.debug("S3 Retriable record writer was passed a null writer (IORecordWriter)"); throw new NullPointerException( "S3 Retriable record writer was passed a null writer (IORecordWriter)" ); diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/avro/AvroFormat.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/avro/AvroFormat.java index 26a150dfa..4797fa6b8 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/avro/AvroFormat.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/avro/AvroFormat.java @@ -21,8 +21,11 @@ import io.confluent.connect.storage.format.Format; import io.confluent.connect.storage.format.RecordWriterProvider; import io.confluent.connect.storage.format.SchemaFileReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AvroFormat implements Format { + private static final Logger log = LoggerFactory.getLogger(AvroFormat.class); private final S3Storage storage; private final AvroData avroData; @@ -38,12 +41,14 @@ public RecordWriterProvider getRecordWriterProvider() { @Override public SchemaFileReader getSchemaFileReader() { + log.debug("Reading schemas from S3 is not currently supported"); throw new UnsupportedOperationException("Reading schemas from S3 is not currently supported"); } @Override @Deprecated public Object getHiveFactory() { + log.debug("Hive integration is not currently supported in S3 Connector"); throw new UnsupportedOperationException( "Hive integration is not currently supported in S3 Connector" ); diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/bytearray/ByteArrayFormat.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/bytearray/ByteArrayFormat.java index 24d20e189..6249f7568 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/bytearray/ByteArrayFormat.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/bytearray/ByteArrayFormat.java @@ -24,8 +24,11 @@ import java.util.HashMap; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ByteArrayFormat implements Format { + private static final Logger log = LoggerFactory.getLogger(ByteArrayFormat.class); private final S3Storage storage; private final ByteArrayConverter converter; @@ -43,12 +46,14 @@ public RecordWriterProvider getRecordWriterProvider() { @Override public SchemaFileReader getSchemaFileReader() { + log.debug("Reading schemas from S3 is not currently supported"); throw new UnsupportedOperationException("Reading schemas from S3 is not currently supported"); } @Override @Deprecated public Object getHiveFactory() { + log.debug("Hive integration is not currently supported in S3 Connector"); throw new UnsupportedOperationException( "Hive integration is not currently supported in S3 Connector"); } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java index cfcc878ff..e4175c634 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java @@ -19,6 +19,8 @@ import java.util.HashMap; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.confluent.connect.s3.S3SinkConnectorConfig; import io.confluent.connect.s3.storage.S3Storage; @@ -27,6 +29,7 @@ import io.confluent.connect.storage.format.SchemaFileReader; public class JsonFormat implements Format { + private static final Logger log = LoggerFactory.getLogger(JsonFormat.class); private final S3Storage storage; private final JsonConverter converter; @@ -53,12 +56,14 @@ public RecordWriterProvider getRecordWriterProvider() { @Override public SchemaFileReader getSchemaFileReader() { + log.debug("Reading schemas from S3 is not currently supported"); throw new UnsupportedOperationException("Reading schemas from S3 is not currently supported"); } @Override @Deprecated public Object getHiveFactory() { + log.debug("Hive integration is not currently supported in S3 Connector"); throw new UnsupportedOperationException( "Hive integration is not currently supported in S3 Connector" ); diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetFormat.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetFormat.java index 7b18731b7..6cc6ded41 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetFormat.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetFormat.java @@ -23,8 +23,11 @@ import io.confluent.connect.storage.format.Format; import io.confluent.connect.storage.format.RecordWriterProvider; import io.confluent.connect.storage.format.SchemaFileReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ParquetFormat implements Format { + private static final Logger log = LoggerFactory.getLogger(ParquetFormat.class); private final S3Storage storage; private final AvroData avroData; @@ -41,12 +44,14 @@ public RecordWriterProvider getRecordWriterProvider() { @Override public SchemaFileReader getSchemaFileReader() { + log.debug("Reading schemas from S3 is not currently supported"); throw new UnsupportedOperationException("Reading schemas from S3 is not currently supported"); } @Override @Deprecated public Object getHiveFactory() { + log.debug("Hive integration is not currently supported in S3 Connector"); throw new UnsupportedOperationException( "Hive integration is not currently supported in S3 Connector" ); diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetRecordWriterProvider.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetRecordWriterProvider.java index e87f4942e..cd7b882de 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetRecordWriterProvider.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/parquet/ParquetRecordWriterProvider.java @@ -151,7 +151,7 @@ public static boolean schemaHasArrayOfOptionalItems(Schema schema, Set s return schemaHasArrayOfOptionalItems(schema.valueSchema(), seenSchemas); case ARRAY: return schema.valueSchema().isOptional() - || schemaHasArrayOfOptionalItems(schema.valueSchema(), seenSchemas); + || schemaHasArrayOfOptionalItems(schema.valueSchema(), seenSchemas); default: return false; } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/ByteBuf.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/ByteBuf.java new file mode 100644 index 000000000..9dede235b --- /dev/null +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/ByteBuf.java @@ -0,0 +1,36 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.s3.storage; + +/** + * A interface for S3OutputStream to write s3-part. + */ +public interface ByteBuf { + + void put(byte b); + + void put(byte[] src, int offset, int length); + + boolean hasRemaining(); + + int remaining(); + + int position(); + + void clear(); + + byte[] array(); +} diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/ElasticByteBuffer.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/ElasticByteBuffer.java new file mode 100644 index 000000000..b62d39886 --- /dev/null +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/ElasticByteBuffer.java @@ -0,0 +1,163 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.s3.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.BufferOverflowException; + +/** + * A elastic byte buffer with a logic size as max size. + * The formula to expand: initCapacity * 2 ^ (incrementFactor * N) + */ +public class ElasticByteBuffer implements ByteBuf { + + private static final Logger log = LoggerFactory.getLogger(ElasticByteBuffer.class); + + public static final int INCREMENT_FACTOR = 1; + + /* logical capacity */ + private int capacity; + + /* initial physical capacity */ + private int initPhysicalCap; + + /* the next position to write */ + private int position; + + /* physical buf */ + private byte[] buf; + + public ElasticByteBuffer(int capacity, int initPhysicalCap) { + if (capacity <= 0) { + throw new IllegalArgumentException("capacity must greater than zero"); + } + + if (initPhysicalCap <= 0) { + log.debug("initial physical capacity must be greater than zero"); + throw new IllegalArgumentException("initial physical capacity must be greater than zero"); + } + + this.capacity = capacity; + this.initPhysicalCap = initPhysicalCap; + + initialize(); + } + + private void initialize() { + this.position = 0; + int initCapacity = Math.min(this.capacity, this.initPhysicalCap); + this.buf = new byte[initCapacity]; + } + + private void expand() { + int currSize = this.buf.length; + int calNewSize = currSize << INCREMENT_FACTOR; + + int newSize = 0; + if (calNewSize < currSize) { + // down overflow + newSize = this.capacity; + } else { + newSize = Math.min(this.capacity, calNewSize); + } + + byte[] currBuf = this.buf; + this.buf = new byte[newSize]; + System.arraycopy(currBuf, 0, this.buf, 0, currSize); + } + + public void put(byte b) { + if (!hasRemaining()) { + throw new BufferOverflowException(); + } + + if (physicalRemaining() <= 0) { + // expand physical buf + expand(); + } + + this.buf[this.position] = b; + this.position++; + } + + public void put(byte[] src, int offset, int length) { + + checkBounds(offset, length, src.length); + + if (remaining() < length) { + throw new BufferOverflowException(); + } + + int remainingOffset = offset; + int remainingLen = length; + while (true) { + if (physicalRemaining() <= 0) { + // expand physical buf + expand(); + } + + if (physicalRemaining() >= remainingLen) { + System.arraycopy(src, remainingOffset, this.buf, this.position, remainingLen); + this.position += remainingLen; + break; + } else { + int physicalRemaining = physicalRemaining(); + System.arraycopy(src, remainingOffset, this.buf, this.position, physicalRemaining); + this.position += physicalRemaining; + remainingOffset += physicalRemaining; + remainingLen -= physicalRemaining; + } + } + } + + static void checkBounds(int off, int len, int size) { // package-private + if ((off | len | (off + len) | (size - (off + len))) < 0) { + throw new IndexOutOfBoundsException(); + } + } + + public int physicalRemaining() { + return this.buf.length - this.position; + } + + public boolean hasRemaining() { + return capacity > position; + } + + public int remaining() { + return capacity - position; + } + + public int position() { + return this.position; + } + + public void clear() { + if (this.buf.length <= this.initPhysicalCap) { + // has not ever expanded, just reset position + this.position = 0; + } else { + initialize(); + } + } + + public final byte[] array() { + return this.buf; + } + +} diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java index c6c409394..b7461c4a5 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java @@ -37,7 +37,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; @@ -59,7 +58,7 @@ public class S3OutputStream extends PositionOutputStream { private final int partSize; private final CannedAccessControlList cannedAcl; private boolean closed; - private final ByteBuffer buffer; + private final ByteBuf buffer; private MultipartUpload multiPartUpload; private final CompressionType compressionType; private final int compressionLevel; @@ -79,13 +78,21 @@ public S3OutputStream(String key, S3SinkConnectorConfig conf, AmazonS3 s3) { this.partSize = conf.getPartSize(); this.cannedAcl = conf.getCannedAcl(); this.closed = false; - this.buffer = ByteBuffer.allocate(this.partSize); + + final boolean elasticBufEnable = conf.getElasticBufferEnable(); + if (elasticBufEnable) { + final int elasticBufInitialCap = conf.getElasticBufferInitCap(); + this.buffer = new ElasticByteBuffer(this.partSize, elasticBufInitialCap); + } else { + this.buffer = new SimpleByteBuffer(this.partSize); + } + this.progressListener = new ConnectProgressListener(); this.multiPartUpload = null; this.compressionType = conf.getCompressionType(); this.compressionLevel = conf.getCompressionLevel(); this.position = 0L; - log.debug("Create S3OutputStream for bucket '{}' key '{}'", bucket, key); + log.info("Create S3OutputStream for bucket '{}' key '{}'", bucket, key); } @Override diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3Storage.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3Storage.java index 206741324..16d759dc7 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3Storage.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3Storage.java @@ -19,8 +19,6 @@ import com.amazonaws.PredefinedClientConfigurations; import com.amazonaws.SdkClientException; import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; import com.amazonaws.retry.PredefinedBackoffStrategies; @@ -47,8 +45,6 @@ import io.confluent.connect.storage.Storage; import io.confluent.connect.storage.common.util.StringUtils; -import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.REGION_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_PATH_STYLE_ACCESS_ENABLED_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_PROXY_URL_CONFIG; @@ -90,6 +86,7 @@ public S3Storage(S3SinkConnectorConfig conf, String url) { * @return S3 client */ public AmazonS3 newS3Client(S3SinkConnectorConfig config) { + log.info("Creating S3 client."); ClientConfiguration clientConfiguration = newClientConfiguration(config); AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard() .withAccelerateModeEnabled(config.getBoolean(WAN_MODE_CONFIG)) @@ -107,7 +104,7 @@ public AmazonS3 newS3Client(S3SinkConnectorConfig config) { new AwsClientBuilder.EndpointConfiguration(url, region) ); } - + log.info("S3 client created"); return builder.build(); } @@ -170,21 +167,13 @@ protected RetryPolicy newFullJitterRetryPolicy(S3SinkConnectorConfig config) { conf.getS3PartRetries(), false ); + log.info("Created a retry policy for the connector"); return retryPolicy; } protected AWSCredentialsProvider newCredentialsProvider(S3SinkConnectorConfig config) { - final String accessKeyId = config.getString(AWS_ACCESS_KEY_ID_CONFIG); - final String secretKey = config.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value(); - if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) { - log.info("Returning new credentials provider using the access key id and " - + "the secret access key that were directly supplied through the connector's " - + "configuration"); - BasicAWSCredentials basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey); - return new AWSStaticCredentialsProvider(basicCredentials); - } - log.info( - "Returning new credentials provider based on the configured credentials provider class"); + log.info("Returning new credentials provider based on the configured " + + "credentials provider class"); return config.getCredentialsProvider(); } @@ -208,17 +197,22 @@ public OutputStream create(String path, S3SinkConnectorConfig conf, boolean over } public S3OutputStream create(String path, boolean overwrite, Class formatClass) { + log.info("Creating S3 output stream."); if (!overwrite) { + log.debug("Creating a file without overwriting is not currently supported in S3 Connector"); throw new UnsupportedOperationException( "Creating a file without overwriting is not currently supported in S3 Connector" ); } if (StringUtils.isBlank(path)) { + log.debug("Path can not be empty!"); throw new IllegalArgumentException("Path can not be empty!"); } if (ParquetFormat.class.isAssignableFrom(formatClass)) { + log.info("Create S3ParquetOutputStream for bucket '{}' key '{}'", + this.conf.getBucketName(), path); return new S3ParquetOutputStream(path, this.conf, s3); } else { // currently ignore what is passed as method argument. @@ -269,6 +263,7 @@ public String url() { @Override public SeekableInput open(String path, S3SinkConnectorConfig conf) { + log.debug("File reading is not currently supported in S3 Connector"); throw new UnsupportedOperationException( "File reading is not currently supported in S3 Connector" ); diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/SimpleByteBuffer.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/SimpleByteBuffer.java new file mode 100644 index 000000000..e997d1baf --- /dev/null +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/SimpleByteBuffer.java @@ -0,0 +1,58 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.s3.storage; + +import java.nio.ByteBuffer; + +/** + * A simple byte buf + */ +public class SimpleByteBuffer implements ByteBuf { + + private ByteBuffer buffer; + + public SimpleByteBuffer(int capacity) { + this.buffer = ByteBuffer.allocate(capacity); + } + + public void put(byte b) { + this.buffer.put(b); + } + + public void put(byte[] src, int offset, int length) { + this.buffer.put(src, offset, length); + } + + public boolean hasRemaining() { + return this.buffer.hasRemaining(); + } + + public int remaining() { + return this.buffer.remaining(); + } + + public int position() { + return this.buffer.position(); + } + + public void clear() { + this.buffer.clear(); + } + + public byte[] array() { + return this.buffer.array(); + } +} diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/util/RetryUtil.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/util/RetryUtil.java new file mode 100644 index 000000000..fc2a32afd --- /dev/null +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/util/RetryUtil.java @@ -0,0 +1,78 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.s3.util; + +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class RetryUtil { + private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + + /** + * @param runnable + * An executable piece of code which will be retried in case an expected exception occurred. + * @param exceptionClass + * The function will be retried only if it throws an instance of exceptionClass. + * @param totalNumIterations + * This is the overall execution count e.g. if 3 then it'd be retried a max of twice. + * @param delayInMillis + * Delay between 2 retries, each time it'd be doubled. + */ + public static void exponentialBackoffRetry(final Runnable runnable, + final Class exceptionClass, + final int totalNumIterations, + final long delayInMillis) throws ConnectException { + long expDelayInMillis = delayInMillis; + for (int i = 1; i <= totalNumIterations; i++) { + try { + runnable.run(); + break; + } catch (Exception e) { + if (e.getClass().equals(exceptionClass)) { + log.warn("Attempt {} of {} failed.", i, totalNumIterations, e); + if (i == totalNumIterations) { + wrapAndThrowAsConnectException(e); + } else { + log.warn("Awaiting {} milliseconds before retrying.", expDelayInMillis); + await(expDelayInMillis); + expDelayInMillis <<= 1; + } + } else { + wrapAndThrowAsConnectException(e); + } + } + } + } + + private static void wrapAndThrowAsConnectException(Exception e) throws ConnectException { + if (e instanceof ConnectException) { + throw (ConnectException) e; + } + throw new ConnectException(e); + } + + private static void await(long millis) { + try { + TimeUnit.MILLISECONDS.sleep(millis); + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + } + } +} diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/util/SchemaPartitioner.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/util/SchemaPartitioner.java new file mode 100644 index 000000000..0139121d1 --- /dev/null +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/util/SchemaPartitioner.java @@ -0,0 +1,80 @@ +/* +* Copyright 2018 Confluent Inc. +* +* Licensed under the Confluent Community License (the "License"); you may not use +* this file except in compliance with the License. You may obtain a copy of the +* License at +* +* http://www.confluent.io/confluent-community-license +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OF ANY KIND, either express or implied. See the License for the +* specific language governing permissions and limitations under the License. +*/ + +package io.confluent.connect.s3.util; + +import io.confluent.connect.s3.S3SinkConnectorConfig; +import io.confluent.connect.storage.common.StorageCommonConfig; +import io.confluent.connect.storage.partitioner.Partitioner; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkRecord; + +import static io.confluent.connect.s3.S3SinkConnectorConfig.SCHEMA_PARTITION_AFFIX_TYPE_CONFIG; + +public class SchemaPartitioner implements Partitioner { + + private final Partitioner delegatePartitioner; + private S3SinkConnectorConfig.AffixType schemaAffixType; + private String delim; + + public SchemaPartitioner(Partitioner delegatePartitioner) { + this.delegatePartitioner = delegatePartitioner; + } + + @Override + public void configure(Map config) { + this.delim = (String) config.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG); + this.schemaAffixType = S3SinkConnectorConfig.AffixType.valueOf( + (String) config.get(SCHEMA_PARTITION_AFFIX_TYPE_CONFIG)); + delegatePartitioner.configure(config); + } + + @Override + public String encodePartition(SinkRecord sinkRecord) { + String encodePartition = this.delegatePartitioner.encodePartition(sinkRecord); + Schema valueSchema = sinkRecord.valueSchema(); + String valueSchemaName = valueSchema != null ? valueSchema.name() : null; + return generateSchemaBasedPath(encodePartition, valueSchemaName); + } + + @Override + public String encodePartition(SinkRecord sinkRecord, long nowInMillis) { + String encodePartition = this.delegatePartitioner.encodePartition(sinkRecord, nowInMillis); + Schema valueSchema = sinkRecord.valueSchema(); + String valueSchemaName = valueSchema != null ? valueSchema.name() : null; + return generateSchemaBasedPath(encodePartition, valueSchemaName); + } + + private String generateSchemaBasedPath(String encodedPartition, String schemaName) { + if (schemaAffixType == S3SinkConnectorConfig.AffixType.PREFIX) { + return "schema_name=" + schemaName + this.delim + encodedPartition; + } else { + return encodedPartition + this.delim + "schema_name=" + schemaName; + } + } + + @Override + public String generatePartitionedPath(String topic, String encodedPartition) { + return delegatePartitioner.generatePartitionedPath(topic, encodedPartition); + } + + @Override + public List partitionFields() { + return delegatePartitioner.partitionFields(); + } +} \ No newline at end of file diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/util/TombstoneSupportedPartitioner.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/util/TombstoneSupportedPartitioner.java new file mode 100644 index 000000000..5db33fee0 --- /dev/null +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/util/TombstoneSupportedPartitioner.java @@ -0,0 +1,55 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.s3.util; + +import io.confluent.connect.storage.partitioner.DefaultPartitioner; +import io.confluent.connect.storage.partitioner.Partitioner; +import java.util.List; +import java.util.Map; +import org.apache.kafka.connect.sink.SinkRecord; + +public class TombstoneSupportedPartitioner extends DefaultPartitioner { + + private final Partitioner delegatePartitioner; + private final String tombstonePartition; + + public TombstoneSupportedPartitioner(Partitioner delegatePartitioner, + String tombstonePartition) { + this.delegatePartitioner = delegatePartitioner; + this.tombstonePartition = tombstonePartition; + } + + @Override + public void configure(Map map) { + delegatePartitioner.configure(map); + } + + @Override + public String encodePartition(SinkRecord sinkRecord) { + return sinkRecord.value() == null ? this.tombstonePartition + : delegatePartitioner.encodePartition(sinkRecord); + } + + @Override + public String generatePartitionedPath(String s, String s1) { + return delegatePartitioner.generatePartitionedPath(s, s1); + } + + @Override + public List partitionFields() { + return delegatePartitioner.partitionFields(); + } +} diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/util/TombstoneTimestampExtractor.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/util/TombstoneTimestampExtractor.java new file mode 100644 index 000000000..f855adc9b --- /dev/null +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/util/TombstoneTimestampExtractor.java @@ -0,0 +1,46 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.s3.util; + +import io.confluent.connect.storage.partitioner.TimeBasedPartitioner.RecordTimestampExtractor; +import io.confluent.connect.storage.partitioner.TimestampExtractor; +import java.util.Map; +import org.apache.kafka.connect.connector.ConnectRecord; + +public class TombstoneTimestampExtractor implements TimestampExtractor { + + private final TimestampExtractor delegateTimestampExtractor; + private static final TimestampExtractor recordTimestampExtractor + = new RecordTimestampExtractor(); + + public TombstoneTimestampExtractor(TimestampExtractor delegateTimestampExtractor) { + this.delegateTimestampExtractor = delegateTimestampExtractor; + } + + @Override + public void configure(Map map) { + delegateTimestampExtractor.configure(map); + recordTimestampExtractor.configure(map); + } + + @Override + public Long extract(ConnectRecord connectRecord) { + if (connectRecord.value() == null) { + return recordTimestampExtractor.extract(connectRecord); + } + return delegateTimestampExtractor.extract(connectRecord); + } +} diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/DataWriterAvroTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/DataWriterAvroTest.java index 17d4f0295..75b917a9c 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/DataWriterAvroTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/DataWriterAvroTest.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java index 219b13c3d..acf1b8dbe 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.IntStream; import io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider; @@ -50,10 +51,13 @@ import io.confluent.connect.storage.partitioner.TimeBasedPartitioner; import io.confluent.connect.avro.AvroDataConfig; +import static io.confluent.connect.s3.S3SinkConnectorConfig.AffixType; import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_DEFAULT; import static io.confluent.connect.s3.S3SinkConnectorConfig.HEADERS_FORMAT_CLASS_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.KEYS_FORMAT_CLASS_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.SCHEMA_PARTITION_AFFIX_TYPE_CONFIG; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; @@ -117,21 +121,24 @@ public void testRecommendedValues() { TimeBasedPartitioner.class, FieldPartitioner.class ); + List expectedSchemaPartitionerAffixTypes = Arrays.stream( + S3SinkConnectorConfig.AffixType.names()).collect(Collectors.toList()); List values = S3SinkConnectorConfig.getConfig().validate(properties); for (ConfigValue val : values) { - if (val.value() instanceof Class) { - switch (val.name()) { - case StorageCommonConfig.STORAGE_CLASS_CONFIG: - assertEquals(expectedStorageClasses, val.recommendedValues()); - break; - case S3SinkConnectorConfig.FORMAT_CLASS_CONFIG: - assertEquals(expectedFormatClasses, val.recommendedValues()); - break; - case PartitionerConfig.PARTITIONER_CLASS_CONFIG: - assertEquals(expectedPartitionerClasses, val.recommendedValues()); - break; - } + switch (val.name()) { + case StorageCommonConfig.STORAGE_CLASS_CONFIG: + assertEquals(expectedStorageClasses, val.recommendedValues()); + break; + case S3SinkConnectorConfig.FORMAT_CLASS_CONFIG: + assertEquals(expectedFormatClasses, val.recommendedValues()); + break; + case PartitionerConfig.PARTITIONER_CLASS_CONFIG: + assertEquals(expectedPartitionerClasses, val.recommendedValues()); + break; + case SCHEMA_PARTITION_AFFIX_TYPE_CONFIG: + assertEquals(expectedSchemaPartitionerAffixTypes, val.recommendedValues()); + break; } } } @@ -219,7 +226,7 @@ public void testConfigurableCredentialProvider() { ); properties.put( configPrefix.concat(DummyAssertiveCredentialsProvider.CONFIGS_NUM_KEY_NAME), - "3" + "5" ); connectorConfig = new S3SinkConnectorConfig(properties); @@ -334,6 +341,16 @@ public void testConfigurableS3ObjectTaggingConfigs() { properties.put(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG, "false"); connectorConfig = new S3SinkConnectorConfig(properties); assertEquals(false, connectorConfig.get(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG)); + + properties.put(S3SinkConnectorConfig.S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG, "ignore"); + connectorConfig = new S3SinkConnectorConfig(properties); + assertEquals("ignore", + connectorConfig.get(S3SinkConnectorConfig.S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG)); + + properties.put(S3SinkConnectorConfig.S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG, "fail"); + connectorConfig = new S3SinkConnectorConfig(properties); + assertEquals("fail", + connectorConfig.get(S3SinkConnectorConfig.S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG)); } private void assertDefaultPartitionerVisibility(List values) { @@ -584,5 +601,33 @@ public void testHeaderFormatClassSupported() { connectorConfig = new S3SinkConnectorConfig(properties); assertEquals(ParquetFormat.class, connectorConfig.getClass(HEADERS_FORMAT_CLASS_CONFIG)); } + + @Test + public void testSchemaPartitionerAffixTypDefault() { + properties.remove(SCHEMA_PARTITION_AFFIX_TYPE_CONFIG); + connectorConfig = new S3SinkConnectorConfig(properties); + assertEquals(AffixType.NONE, connectorConfig.getSchemaPartitionAffixType()); + } + + @Test + public void testSchemaPartitionerAffixType() { + properties.put(SCHEMA_PARTITION_AFFIX_TYPE_CONFIG, AffixType.NONE.name()); + connectorConfig = new S3SinkConnectorConfig(properties); + assertEquals(AffixType.NONE, connectorConfig.getSchemaPartitionAffixType()); + + properties.put(SCHEMA_PARTITION_AFFIX_TYPE_CONFIG, AffixType.PREFIX.name()); + connectorConfig = new S3SinkConnectorConfig(properties); + assertEquals(AffixType.PREFIX, connectorConfig.getSchemaPartitionAffixType()); + + properties.put(SCHEMA_PARTITION_AFFIX_TYPE_CONFIG, AffixType.SUFFIX.name()); + connectorConfig = new S3SinkConnectorConfig(properties); + assertEquals(S3SinkConnectorConfig.AffixType.SUFFIX, connectorConfig.getSchemaPartitionAffixType()); + } + + @Test(expected = ConfigException.class) + public void testSchemaPartitionerAffixTypeExceptionOnWrongValue() { + properties.put(SCHEMA_PARTITION_AFFIX_TYPE_CONFIG, "Random"); + new S3SinkConnectorConfig(properties); + } } diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorFaultyS3Test.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorFaultyS3Test.java index 343122123..8228804c5 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorFaultyS3Test.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorFaultyS3Test.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -134,8 +135,10 @@ public void tearDown() throws Exception { @BeforeClass public static void startConnect() { + Map workerProps = new HashMap<>(); + workerProps.put("plugin.discovery","hybrid_warn"); connect = new EmbeddedConnectCluster.Builder() - .name("s3-connect-cluster") + .name("s3-connect-cluster").workerProps(workerProps) .build(); connect.start(); kafkaAdmin = connect.kafka().createAdminClient(); diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkTaskTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkTaskTest.java index 38b5589cd..2270cc3f0 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkTaskTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkTaskTest.java @@ -118,7 +118,7 @@ public void testWriteNullRecords() throws Exception { task.initialize(mockContext); properties.put(S3SinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, - S3SinkConnectorConfig.BehaviorOnNullValues.IGNORE.toString()); + S3SinkConnectorConfig.IgnoreOrFailBehavior.IGNORE.toString()); task.start(properties); verifyAll(); diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/TopicPartitionWriterTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/TopicPartitionWriterTest.java index f2cc0569a..c95ad0284 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/TopicPartitionWriterTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/TopicPartitionWriterTest.java @@ -15,6 +15,7 @@ package io.confluent.connect.s3; +import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.Tag; @@ -27,10 +28,10 @@ import io.confluent.connect.s3.format.RecordViews.KeyRecordView; import io.confluent.connect.s3.format.json.JsonFormat; import io.confluent.connect.s3.storage.CompressionType; +import io.confluent.connect.s3.util.SchemaPartitioner; import io.confluent.connect.storage.errors.PartitionException; import io.confluent.kafka.serializers.NonRecordContainer; -import java.util.HashSet; -import java.util.Set; + import org.apache.avro.util.Utf8; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.connector.ConnectRecord; @@ -54,13 +55,15 @@ import org.junit.Test; import java.io.IOException; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -82,21 +85,29 @@ import io.confluent.connect.storage.partitioner.PartitionerConfig; import io.confluent.connect.storage.partitioner.TimeBasedPartitioner; import io.confluent.connect.storage.partitioner.TimestampExtractor; +import org.junit.experimental.theories.DataPoints; +import org.junit.experimental.theories.FromDataPoints; +import org.junit.experimental.theories.Theories; +import org.junit.experimental.theories.Theory; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import static io.confluent.connect.s3.S3SinkConnectorConfig.SCHEMA_PARTITION_AFFIX_TYPE_CONFIG; import static io.confluent.connect.s3.util.Utils.sinkRecordToLoggableString; import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG; +import static io.confluent.connect.storage.StorageSinkConnectorConfig.FORMAT_CLASS_CONFIG; +import static io.confluent.connect.storage.common.StorageCommonConfig.DIRECTORY_DELIM_CONFIG; import static io.confluent.connect.storage.partitioner.PartitionerConfig.PARTITION_FIELD_NAME_CONFIG; import static org.apache.kafka.common.utils.Time.SYSTEM; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +@RunWith(Theories.class) public class TopicPartitionWriterTest extends TestWithMockedS3 { // The default private static final String ZERO_PAD_FMT = "%010d"; @@ -160,6 +171,30 @@ public S3OutputStream create(String path, boolean overwrite, Class formatClas extension = writerProvider.getExtension(); } + public void setUpWithTaggingException(boolean mockSdkClientException) throws Exception { + super.setUp(); + + s3 = newS3Client(connectorConfig); + storage = new S3Storage(connectorConfig, url, S3_TEST_BUCKET_NAME, s3) { + @Override + public void addTags(String fileName, Map tags) throws SdkClientException { + if (mockSdkClientException) { + throw new SdkClientException("Mock SdkClientException while tagging"); + } + throw new RuntimeException("Mock RuntimeException while tagging"); + } + }; + + format = new AvroFormat(storage); + + s3.createBucket(S3_TEST_BUCKET_NAME); + assertTrue(s3.doesBucketExistV2(S3_TEST_BUCKET_NAME)); + + Format format = new AvroFormat(storage); + writerProvider = format.getRecordWriterProvider(); + extension = writerProvider.getExtension(); + } + @After @Override public void tearDown() throws Exception { @@ -599,6 +634,26 @@ public void testWallclockUsesBatchTimePartitionBoundary() throws Exception { topicsDir, dirPrefix, TOPIC_PARTITION, 0, extension, ZERO_PAD_FMT)); verify(expectedFiles, 6, schema, records); } + @DataPoints("affixType") + public static S3SinkConnectorConfig.AffixType[] affixTypeValues(){ + return new S3SinkConnectorConfig.AffixType[]{ + S3SinkConnectorConfig.AffixType.PREFIX, S3SinkConnectorConfig.AffixType.SUFFIX + }; + } + + @DataPoints("testWithSchemaData") + public static boolean[] testWithSchemaDataValues(){ + return new boolean[]{true, false}; + } + + @Theory + public void testWriteSchemaPartitionerWithAffix( + @FromDataPoints("affixType")S3SinkConnectorConfig.AffixType affixType, + @FromDataPoints("testWithSchemaData") boolean testWithSchemaData + ) throws Exception { + testWriteSchemaPartitionerWithAffix(testWithSchemaData, affixType); + } + @Test public void testWriteRecordsAfterScheduleRotationExpiryButNoResetShouldGoToSameFile() @@ -1098,6 +1153,34 @@ public void testAddingS3ObjectTags() throws Exception{ verifyTags(expectedTaggedFiles); } + @Test + public void testIgnoreS3ObjectTaggingSdkClientException() throws Exception { + // Tagging error occurred (SdkClientException) but getting ignored. + testS3ObjectTaggingErrorHelper(true, true); + } + + @Test + public void testIgnoreS3ObjectTaggingRuntimeException() throws Exception { + // Tagging error occurred (RuntimeException) but getting ignored. + testS3ObjectTaggingErrorHelper(false, true); + } + + @Test + public void testFailS3ObjectTaggingSdkClientException() throws Exception { + ConnectException exception = assertThrows(ConnectException.class, + () -> testS3ObjectTaggingErrorHelper(true, false)); + assertEquals("Unable to tag S3 object topics_test-topic_partition=12_test-topic#12#0000000000.avro", exception.getMessage()); + assertEquals("Mock SdkClientException while tagging", exception.getCause().getMessage()); + } + + @Test + public void testFailS3ObjectTaggingRuntimeException() throws Exception { + ConnectException exception = assertThrows(ConnectException.class, () -> + testS3ObjectTaggingErrorHelper(false, false)); + assertEquals("Unable to tag S3 object topics_test-topic_partition=12_test-topic#12#0000000000.avro", exception.getMessage()); + assertEquals("Mock RuntimeException while tagging", exception.getCause().getMessage()); + } + @Test public void testExceptionOnNullKeysReported() throws Exception { String recordValue = "1"; @@ -1215,7 +1298,6 @@ public void testRecordKeysAndHeadersWritten() throws Exception { // Define the partitioner Partitioner partitioner = new DefaultPartitioner<>(); partitioner.configure(parsedConfig); - TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter( TOPIC_PARTITION, storage, getKeyHeaderValueProvider(), partitioner, connectorConfig, context, null); @@ -1402,6 +1484,246 @@ private RecordWriterProvider getKeyHeaderValueProviderJso ); } + public void testWriteSchemaPartitionerWithAffix( + boolean testWithSchemaData, S3SinkConnectorConfig.AffixType affixType + ) throws Exception { + localProps.put(FLUSH_SIZE_CONFIG, "9"); + localProps.put(FORMAT_CLASS_CONFIG, JsonFormat.class.getName()); + setUp(); + + Format myFormat = new JsonFormat(storage); + writerProvider = myFormat.getRecordWriterProvider(); + extension = writerProvider.getExtension(); + + parsedConfig.put(SCHEMA_PARTITION_AFFIX_TYPE_CONFIG, affixType.name()); + // Define the partitioner + Partitioner basePartitioner = new DefaultPartitioner<>(); + Partitioner partitioner = new SchemaPartitioner<>(basePartitioner); + partitioner.configure(parsedConfig); + + TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter( + TOPIC_PARTITION, storage, writerProvider, partitioner, connectorConfig, context, null + ); + + List testData; + if (testWithSchemaData) { + testData = generateTestDataWithSchema(partitioner, affixType); + } else { + testData = generateTestDataWithoutSchema(partitioner, affixType); + } + + String key = (String) testData.get(0); + List actualRecords = (List) testData.get(1); + List expectedRecords = (List) testData.get(2); + List expectedFiles = (List) testData.get(3); + + + for (SinkRecord actualRecord : actualRecords) { + topicPartitionWriter.buffer(actualRecord); + } + // Test actual write + topicPartitionWriter.write(); + topicPartitionWriter.close(); + + verifyWithJsonOutput( + expectedFiles, expectedRecords.size() / expectedFiles.size(), expectedRecords, CompressionType.NONE + ); + } + + private List generateTestDataWithSchema( + Partitioner partitioner, S3SinkConnectorConfig.AffixType affixType + ) { + String key = "key"; + + Schema schema1 = SchemaBuilder.struct() + .name(null) + .version(null) + .field("boolean", Schema.BOOLEAN_SCHEMA) + .field("int", Schema.INT32_SCHEMA) + .field("long", Schema.INT64_SCHEMA) + .field("float", Schema.FLOAT32_SCHEMA) + .field("double", Schema.FLOAT64_SCHEMA) + .build(); + List records1 = createRecordBatches(schema1, 3, 6); + + Schema schema2 = SchemaBuilder.struct() + .name("record1") + .version(1) + .field("boolean", Schema.BOOLEAN_SCHEMA) + .field("int", Schema.INT32_SCHEMA) + .field("long", Schema.INT64_SCHEMA) + .field("float", Schema.FLOAT32_SCHEMA) + .field("double", Schema.FLOAT64_SCHEMA) + .build(); + + List records2 = createRecordBatches(schema2, 3, 6); + + Schema schema3 = SchemaBuilder.struct() + .name("record2") + .version(1) + .field("boolean", Schema.BOOLEAN_SCHEMA) + .field("int", Schema.INT32_SCHEMA) + .field("long", Schema.INT64_SCHEMA) + .field("float", Schema.FLOAT32_SCHEMA) + .field("double", Schema.FLOAT64_SCHEMA) + .build(); + + List records3 = createRecordBatches(schema3, 3, 6); + + ArrayList actualData = new ArrayList<>(); + int offset = 0; + for (int i = 0; i < records1.size(); i++) { + actualData.add( + new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema1, records1.get(i), + offset++ + ) + ); + actualData.add( + new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema2, records2.get(i), + offset++ + ) + ); + actualData.add( + new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema3, records3.get(i), + offset++ + ) + ); + } + List expectedRecords = new ArrayList<>(); + int ibase = 16; + float fbase = 12.2f; + offset = 0; + // The expected sequence of records is constructed taking into account that sorting of files occurs in verify + + for (int i = 0; i < 6; ++i) { + for (int j = 0; j < 3; ++j) { + expectedRecords.add( + new SinkRecord( + TOPIC, PARTITION, Schema.STRING_SCHEMA, + key, schema1, createRecord(schema1, ibase + j, fbase + j), + offset++ + ) + ); + } + } + for (int i = 0; i < 6; ++i) { + for (int j = 0; j < 3; ++j) { + expectedRecords.add( + new SinkRecord( + TOPIC, PARTITION, Schema.STRING_SCHEMA, + key, schema2, createRecord(schema2, ibase + j, fbase + j), + offset++ + ) + ); + } + } + for (int i = 0; i < 6; ++i) { + for (int j = 0; j < 3; ++j) { + expectedRecords.add( + new SinkRecord( + TOPIC, PARTITION, Schema.STRING_SCHEMA, + key, schema3, createRecord(schema3, ibase + j, fbase + j), + offset++ + ) + ); + } + } + + String dirPrefix1 = generateS3DirectoryPathWithDefaultPartitioner( + partitioner, affixType, PARTITION, TOPIC, "null" + ); + String dirPrefix2 = generateS3DirectoryPathWithDefaultPartitioner( + partitioner, affixType, PARTITION, TOPIC, "record1" + ); + String dirPrefix3 = generateS3DirectoryPathWithDefaultPartitioner( + partitioner, affixType, PARTITION, TOPIC, "record2" + ); + List expectedFiles = new ArrayList<>(); + for (int i = 0; i < 54; i += 9) { + expectedFiles.add(FileUtils.fileKeyToCommit( + topicsDir, dirPrefix1, TOPIC_PARTITION, i, extension, ZERO_PAD_FMT + )); + expectedFiles.add(FileUtils.fileKeyToCommit( + topicsDir, dirPrefix2, TOPIC_PARTITION, i + 1, extension, ZERO_PAD_FMT + )); + expectedFiles.add(FileUtils.fileKeyToCommit( + topicsDir, dirPrefix3, TOPIC_PARTITION, i + 2, extension, ZERO_PAD_FMT + )); + } + return Arrays.asList(key, actualData, expectedRecords, expectedFiles); + } + + private List generateTestDataWithoutSchema( + Partitioner partitioner, S3SinkConnectorConfig.AffixType affixType + ) { + String key = "key"; + List records = createJsonRecordsWithoutSchema(18); + + ArrayList actualData = new ArrayList<>(); + int offset = 0; + for (String record : records) { + actualData.add( + new SinkRecord( + TOPIC, PARTITION, Schema.STRING_SCHEMA, key, null, record, offset++ + ) + ); + } + List expectedRecords = new ArrayList<>(actualData); + + String dirPrefix = generateS3DirectoryPathWithDefaultPartitioner( + partitioner, affixType, PARTITION, TOPIC, "null" + ); + List expectedFiles = new ArrayList<>(); + for (int i = 0; i < 18; i += 9) { + expectedFiles.add(FileUtils.fileKeyToCommit( + topicsDir, dirPrefix, TOPIC_PARTITION, i, extension, ZERO_PAD_FMT + )); + } + return Arrays.asList(key, actualData, expectedRecords, expectedFiles); + } + + private String generateS3DirectoryPathWithDefaultPartitioner( + Partitioner basePartitioner, + S3SinkConnectorConfig.AffixType affixType, int partition, + String topic, String schema_name + ) { + if (affixType == S3SinkConnectorConfig.AffixType.SUFFIX) { + return basePartitioner.generatePartitionedPath(topic, + "partition=" + partition + parsedConfig.get(DIRECTORY_DELIM_CONFIG) + + "schema_name" + "=" + schema_name); + } else if (affixType == S3SinkConnectorConfig.AffixType.PREFIX) { + return basePartitioner.generatePartitionedPath(topic, + "schema_name" + "=" + schema_name + + parsedConfig.get(DIRECTORY_DELIM_CONFIG) + "partition=" + partition); + } else { + return basePartitioner.generatePartitionedPath(topic, + "partition=" + partition); + } + } + + protected List createJsonRecordsWithoutSchema(int size) { + ArrayList records = new ArrayList<>(); + int ibase = 16; + float fbase = 12.2f; + for (int i = 0; i < size; ++i) { + String record = "{\"schema\":{\"type\":\"struct\",\"fields\":[ " + + "{\"type\":\"boolean\",\"optional\":true,\"field\":\"booleanField\"}," + + "{\"type\":\"int\",\"optional\":true,\"field\":\"intField\"}," + + "{\"type\":\"long\",\"optional\":true,\"field\":\"longField\"}," + + "{\"type\":\"float\",\"optional\":true,\"field\":\"floatField\"}," + + "{\"type\":\"double\",\"optional\":true,\"field\":\"doubleField\"}]," + + "\"payload\":" + + "{\"booleanField\":\"true\"," + + "\"intField\":" + String.valueOf(ibase + i) + "," + + "\"longField\":" + String.valueOf((long) ibase + i) + "," + + "\"floatField\":" + String.valueOf(fbase + i) + "," + + "\"doubleField\":" + String.valueOf((double) (fbase + i)) + + "}}"; + records.add(record); + } + return records; + } + private Struct createRecord(Schema schema, int ibase, float fbase) { return new Struct(schema) .put("boolean", true) @@ -1498,12 +1820,49 @@ private void verify(List expectedFileKeys, int expectedSize, Schema sche Collection actualRecords = readRecordsAvro(S3_TEST_BUCKET_NAME, fileKey, s3); assertEquals(expectedSize, actualRecords.size()); for (Object avroRecord : actualRecords) { - Object expectedRecord = format.getAvroData().fromConnectData(schema, records.get(index++)); + Object expectedRecord = format.getAvroData().fromConnectData(records.get(index).schema(), records.get(index++)); assertEquals(expectedRecord, avroRecord); } } } + private void verifyWithJsonOutput( + List expectedFileKeys, int expectedSize, + List expectedRecords, CompressionType compressionType + ) throws IOException { + List summaries = listObjects(S3_TEST_BUCKET_NAME, null, s3); + List actualFiles = new ArrayList<>(); + for (S3ObjectSummary summary : summaries) { + String fileKey = summary.getKey(); + actualFiles.add(fileKey); + } + + Collections.sort(actualFiles); + Collections.sort(expectedFileKeys); + assertThat(actualFiles, is(expectedFileKeys)); + + int index = 0; + for (String fileKey : actualFiles) { + Collection actualRecords = readRecordsJson( + S3_TEST_BUCKET_NAME, fileKey, + s3, compressionType + ); + assertEquals(expectedSize, actualRecords.size()); + for (Object currentRecord : actualRecords) { + SinkRecord expectedRecord = expectedRecords.get(index++); + Object expectedValue = expectedRecord.value(); + JsonConverter converter = new JsonConverter(); + converter.configure(Collections.singletonMap("schemas.enable", "false"), false); + ObjectMapper mapper = new ObjectMapper(); + if (expectedValue instanceof Struct) { + byte[] expectedBytes = converter.fromConnectData(TOPIC, expectedRecord.valueSchema(), expectedRecord.value()); + expectedValue = mapper.readValue(expectedBytes, Object.class); + } + assertEquals(expectedValue, currentRecord); + } + } + } + // based on verify() private void verifyRecordElement(List expectedFileKeys, int expectedSize, List records, RecordElement fileType) throws IOException { @@ -1637,6 +1996,35 @@ private void verifyTags(Map> expectedTaggedFiles) } } + private void testS3ObjectTaggingErrorHelper(boolean mockSdkClientException, boolean ignoreTaggingError) throws Exception { + // Enabling tagging and setting behavior for tagging error. + localProps.put(S3SinkConnectorConfig.S3_OBJECT_TAGGING_CONFIG, "true"); + localProps.put(S3SinkConnectorConfig.S3_OBJECT_BEHAVIOR_ON_TAGGING_ERROR_CONFIG, ignoreTaggingError ? "ignore" : "fail"); + + // Setup mock exception while tagging + setUpWithTaggingException(mockSdkClientException); + + // Define the partitioner + Partitioner partitioner = new DefaultPartitioner<>(); + partitioner.configure(parsedConfig); + TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter( + TOPIC_PARTITION, storage, writerProvider, partitioner, connectorConfig, context, null); + + String key = "key"; + Schema schema = createSchema(); + List records = createRecordBatches(schema, 3, 3); + + Collection sinkRecords = createSinkRecords(records, key, schema); + + for (SinkRecord record : sinkRecords) { + topicPartitionWriter.buffer(record); + } + + // Invoke write so as to simulate tagging error. + topicPartitionWriter.write(); + topicPartitionWriter.close(); + } + public static class MockedWallclockTimestampExtractor implements TimestampExtractor { public final MockTime time; diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java index 232114197..4351ddff3 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java @@ -189,7 +189,7 @@ protected long waitForFilesInBucket(String bucketName, int numFiles) throws Inte *

* Format: topics/s3_topic/partition=97/s3_topic+97+0000000001.avro * - * @param topic the test kafileContentsMatchExpectedfka topic + * @param topic the test kafka topic * @param partition the expected partition for the tests * @param flushSize the flush size connector config * @param numRecords the number of records produced in the test @@ -220,6 +220,44 @@ protected List getExpectedFilenames( return expectedFiles; } + /** + * Get a list of the expected filenames containing keys for the tombstone records for the bucket. + *

+ * Format: topics/s3_topic/tombstone/s3_topic+97+0000000001.keys.avro + * + * @param topic the test kafka topic + * @param partition the expected partition for the tests + * @param flushSize the flush size connector config + * @param numRecords the number of records produced in the test + * @param extension the expected extensions of the files including compression (snappy.parquet) + * @param tombstonePartition the expected directory for tombstone records + * @return the list of expected filenames + */ + protected List getExpectedTombstoneFilenames( + String topic, + int partition, + int flushSize, + long numRecords, + String extension, + String tombstonePartition + ) { + int expectedFileCount = (int) numRecords / flushSize; + List expectedFiles = new ArrayList<>(); + for (int offset = 0; offset < expectedFileCount * flushSize; offset += flushSize) { + String filepath = String.format( + "topics/%s/%s/%s+%d+%010d.keys.%s", + topic, + tombstonePartition, + topic, + partition, + offset, + extension + ); + expectedFiles.add(filepath); + } + return expectedFiles; + } + /** * Check if the file names in the bucket have the expected namings. * @@ -397,6 +435,46 @@ protected boolean fileContentsAsExpected( return true; } + protected boolean keyfileContentsAsExpected( + String bucketName, + int expectedRowsPerFile, + String expectedKey + ) { + log.info("expectedKey: {}", expectedKey); + for (String fileName : + getS3KeyFileList(S3Client.listObjectsV2(bucketName).getObjectSummaries())) { + String destinationPath = TEST_DOWNLOAD_PATH + fileName; + File downloadedFile = new File(destinationPath); + log.info("Saving file to : {}", destinationPath); + S3Client.getObject(new GetObjectRequest(bucketName, fileName), downloadedFile); + List keyContent = new ArrayList<>(); + try (FileReader fileReader = new FileReader(destinationPath); + BufferedReader bufferedReader = new BufferedReader(fileReader)) { + String line; + while ((line = bufferedReader.readLine()) != null) { + keyContent.add(line); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + if (keyContent.size() != expectedRowsPerFile) { + log.error("Actual number of records in the key file {}, Expected number of records {}", + keyContent.size(), expectedRowsPerFile); + return false; + } + for (String actualKey: keyContent) { + if (!expectedKey.equals(actualKey)) { + log.error("Key {} did not match the contents in the key file {}", expectedKey, actualKey); + return false; + } else { + log.info("Key {} matched the contents in the key file {}", expectedKey, actualKey); + } + } + downloadedFile.delete(); + } + return true; + } + /** * Check if the contents of a downloaded file match the expected row. * @@ -423,6 +501,14 @@ protected boolean fileContentsMatchExpected( return true; } + private List getS3KeyFileList(List summaries) { + final String includeExtensions = ".keys."; + return summaries.stream() + .filter(summary -> summary.getKey().contains(includeExtensions)) + .map(S3ObjectSummary::getKey) + .collect(Collectors.toList()); + } + /** * Compare the row in the file and its values to the expected row's values. * diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkConnectorIT.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkConnectorIT.java index e2112fbd9..24b6296b4 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkConnectorIT.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkConnectorIT.java @@ -16,9 +16,13 @@ package io.confluent.connect.s3.integration; import static io.confluent.connect.s3.S3SinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.KEYS_FORMAT_CLASS_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_BUCKET_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.STORE_KAFKA_HEADERS_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.STORE_KAFKA_KEYS_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.TOMBSTONE_ENCODED_PARTITION; import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG; import static io.confluent.connect.storage.StorageSinkConnectorConfig.FORMAT_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; @@ -30,7 +34,8 @@ import static org.junit.Assert.assertTrue; import io.confluent.connect.s3.S3SinkConnector; -import io.confluent.connect.s3.S3SinkConnectorConfig.BehaviorOnNullValues; +import io.confluent.connect.s3.S3SinkConnectorConfig.IgnoreOrFailBehavior; +import io.confluent.connect.s3.S3SinkConnectorConfig.OutputWriteBehavior; import io.confluent.connect.s3.format.avro.AvroFormat; import io.confluent.connect.s3.format.json.JsonFormat; import io.confluent.connect.s3.format.parquet.ParquetFormat; @@ -83,6 +88,8 @@ public class S3SinkConnectorIT extends BaseConnectorIT { private static final String DLQ_TOPIC_CONFIG = "errors.deadletterqueue.topic.name"; private static final String DLQ_TOPIC_NAME = "DLQ-topic"; + private static final String TOMBSTONE_PARTITION = "TOMBSTONE_PARTITION"; + private static final List KAFKA_TOPICS = Collections.singletonList(DEFAULT_TEST_TOPIC_NAME); private static final long CONSUME_MAX_DURATION_MS = TimeUnit.SECONDS.toMillis(10); @@ -138,6 +145,17 @@ public void testBasicRecordsWrittenJson() throws Throwable { } @Test + public void testTombstoneRecordsWrittenJson() throws Throwable { + //add test specific props + props.put(FORMAT_CLASS_CONFIG, JsonFormat.class.getName()); + props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, OutputWriteBehavior.WRITE.toString()); + props.put(STORE_KAFKA_KEYS_CONFIG, "true"); + props.put(KEYS_FORMAT_CLASS_CONFIG, "io.confluent.connect.s3.format.json.JsonFormat"); + props.put(TOMBSTONE_ENCODED_PARTITION, TOMBSTONE_PARTITION); + testTombstoneRecordsWritten(JSON_EXTENSION, false); + } + + public void testFilesWrittenToBucketAvroWithExtInTopic() throws Throwable { //add test specific props props.put(FORMAT_CLASS_CONFIG, AvroFormat.class.getName()); @@ -226,10 +244,64 @@ private void testBasicRecordsWritten( assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, recordValueStruct)); } + private void testTombstoneRecordsWritten( + String expectedFileExtension, + boolean addExtensionInTopic + ) throws Throwable { + final String topicNameWithExt = "other." + expectedFileExtension + ".topic." + expectedFileExtension; + + // Add an extra topic with this extension inside of the name + // Use a TreeSet for test determinism + Set topicNames = new TreeSet<>(KAFKA_TOPICS); + + if (addExtensionInTopic) { + topicNames.add(topicNameWithExt); + connect.kafka().createTopic(topicNameWithExt, 1); + props.replace( + "topics", + props.get("topics") + "," + topicNameWithExt + ); + } + + // start sink connector + connect.configureConnector(CONNECTOR_NAME, props); + // wait for tasks to spin up + EmbeddedConnectUtils.waitForConnectorToStart(connect, CONNECTOR_NAME, Math.min(topicNames.size(), MAX_TASKS)); + + for (String thisTopicName : topicNames) { + // Create and send records to Kafka using the topic name in the current 'thisTopicName' + SinkRecord sampleRecord = getSampleTopicRecord(thisTopicName, null, null); + produceRecordsWithHeadersNoValue(thisTopicName, NUM_RECORDS_INSERT, sampleRecord); + } + + log.info("Waiting for files in S3..."); + int countPerTopic = NUM_RECORDS_INSERT / FLUSH_SIZE_STANDARD; + int expectedTotalFileCount = countPerTopic * topicNames.size(); + waitForFilesInBucket(TEST_BUCKET_NAME, expectedTotalFileCount); + + Set expectedTopicFilenames = new TreeSet<>(); + for (String thisTopicName : topicNames) { + List theseFiles = getExpectedTombstoneFilenames( + thisTopicName, + TOPIC_PARTITION, + FLUSH_SIZE_STANDARD, + NUM_RECORDS_INSERT, + expectedFileExtension, + TOMBSTONE_PARTITION + ); + assertEquals(theseFiles.size(), countPerTopic); + expectedTopicFilenames.addAll(theseFiles); + } + // This check will catch any duplications + assertEquals(expectedTopicFilenames.size(), expectedTotalFileCount); + assertFileNamesValid(TEST_BUCKET_NAME, new ArrayList<>(expectedTopicFilenames)); + assertTrue(keyfileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, "\"key\"")); + } + @Test public void testFaultyRecordsReportedToDLQ() throws Throwable { props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); - props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.IGNORE.toString()); + props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, IgnoreOrFailBehavior.IGNORE.toString()); props.put(STORE_KAFKA_KEYS_CONFIG, "true"); props.put(STORE_KAFKA_HEADERS_CONFIG, "true"); props.put(DLQ_TOPIC_CONFIG, DLQ_TOPIC_NAME); diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/storage/ElasticByteBufferTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/storage/ElasticByteBufferTest.java new file mode 100644 index 000000000..5bcd9422f --- /dev/null +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/storage/ElasticByteBufferTest.java @@ -0,0 +1,332 @@ +package io.confluent.connect.s3.storage; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang.math.RandomUtils; +import org.junit.Test; +import java.nio.BufferOverflowException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class ElasticByteBufferTest { + + public static final int INIT_CAP = 128 * 1024; + + @Test(expected = IllegalArgumentException.class) + public void testIllegalCapacity1() { + ElasticByteBuffer buf = new ElasticByteBuffer(-1, INIT_CAP); + } + + @Test(expected = IllegalArgumentException.class) + public void testIllegalCapacity2() { + ElasticByteBuffer buf = new ElasticByteBuffer(0, INIT_CAP); + } + + @Test + public void testLessThanInitCapacityPut1() { + ElasticByteBuffer buf = new ElasticByteBuffer(1024, INIT_CAP); + + assertEquals(1024, buf.physicalRemaining()); + assertEquals(1024, buf.remaining()); + assertEquals(0, buf.position()); + assertTrue(buf.hasRemaining()); + assertEquals(1024, buf.array().length); + + buf.put((byte) 0x01); + assertEquals(1023, buf.physicalRemaining()); + assertEquals(1023, buf.remaining()); + assertEquals(1, buf.position()); + assertTrue(buf.hasRemaining()); + assertEquals(1024, buf.array().length); + + byte[] randomBytes = RandomStringUtils.randomAlphanumeric(1023).getBytes(); + for (byte randomByte : randomBytes) { + buf.put(randomByte); + } + + assertEquals(0, buf.physicalRemaining()); + assertEquals(0, buf.remaining()); + assertEquals(1024, buf.position()); + assertFalse(buf.hasRemaining()); + assertEquals(1024, buf.array().length); + + assertThrows(BufferOverflowException.class, () -> { + buf.put((byte) 0x01); + }); + + } + + @Test + public void testLessThanInitCapacityPut2() { + + int cap = 1024; + ElasticByteBuffer buf = new ElasticByteBuffer(cap, INIT_CAP); + + byte[] randomBytes1 = RandomStringUtils.randomAlphanumeric(4).getBytes(); + buf.put(randomBytes1, 0, randomBytes1.length); + + assertEquals(1020, buf.physicalRemaining()); + assertEquals(1020, buf.remaining()); + assertEquals(4, buf.position()); + assertTrue(buf.hasRemaining()); + assertEquals(cap, buf.array().length); + + byte[] randomBytes2 = RandomStringUtils.randomAlphanumeric(1019).getBytes(); + buf.put(randomBytes2, 0, randomBytes2.length); + + assertEquals(1, buf.physicalRemaining()); + assertEquals(1, buf.remaining()); + assertEquals(1023, buf.position()); + assertTrue(buf.hasRemaining()); + assertEquals(cap, buf.array().length); + + byte[] randomBytes3 = RandomStringUtils.randomAlphanumeric(2).getBytes(); + assertThrows(BufferOverflowException.class, () -> { + buf.put(randomBytes3, 0, randomBytes3.length); + }); + + buf.put(new byte[] {0x01}, 0, 1); + assertEquals(0, buf.physicalRemaining()); + assertEquals(0, buf.remaining()); + assertEquals(cap, buf.position()); + assertFalse(buf.hasRemaining()); + assertEquals(cap, buf.array().length); + } + + @Test + public void testLessThanInitCapacityClear() { + ElasticByteBuffer buf = new ElasticByteBuffer(1024, INIT_CAP); + + byte[] randomBytes1 = RandomStringUtils.randomAlphanumeric(4).getBytes(); + buf.put(randomBytes1, 0, randomBytes1.length); + + byte[] arrayBeforeClear = buf.array(); + buf.clear(); + byte[] arrayAfterClear = buf.array(); + assertEquals(arrayAfterClear.length, arrayBeforeClear.length); + assertSame(arrayAfterClear, arrayBeforeClear); + } + + + @Test + public void testGreaterThanInitCapacityPut1() { + + int cap = 10 * 1024 * 1024; + ElasticByteBuffer buf = new ElasticByteBuffer(cap, INIT_CAP); + + assertEquals(INIT_CAP, buf.physicalRemaining()); + assertEquals(cap, buf.remaining()); + assertEquals(0, buf.position()); + assertTrue(buf.hasRemaining()); + assertEquals(INIT_CAP, buf.array().length); + + byte[] randomBytes1 = RandomStringUtils.randomAlphanumeric(INIT_CAP).getBytes(); + for (byte randomByte : randomBytes1) { + buf.put(randomByte); + } + + assertEquals(0, buf.physicalRemaining()); + assertEquals(cap - INIT_CAP, buf.remaining()); + assertEquals(INIT_CAP, buf.position()); + assertTrue(buf.hasRemaining()); + assertEquals(INIT_CAP, buf.array().length); + + int testBytesLen1 = 5; + byte[] randomBytes2 = RandomStringUtils.randomAlphanumeric(testBytesLen1).getBytes(); + for (byte randomByte : randomBytes2) { + buf.put(randomByte); + } + + int exceptNewPhysicalSize = INIT_CAP * 2; + + assertEquals(exceptNewPhysicalSize - (INIT_CAP + testBytesLen1), buf.physicalRemaining()); + assertEquals(cap - (INIT_CAP + testBytesLen1), buf.remaining()); + assertEquals(INIT_CAP + testBytesLen1, buf.position()); + assertTrue(buf.hasRemaining()); + assertEquals(exceptNewPhysicalSize, buf.array().length); + + int remaining = cap - (INIT_CAP + testBytesLen1); + byte[] randomBytes3 = RandomStringUtils.randomAlphanumeric(remaining).getBytes(); + for (byte randomByte : randomBytes3) { + buf.put(randomByte); + } + + assertEquals(0, buf.physicalRemaining()); + assertEquals(0, buf.remaining()); + assertEquals(cap, buf.position()); + assertFalse(buf.hasRemaining()); + assertEquals(cap, buf.array().length); + + assertThrows(BufferOverflowException.class, ()->{ + buf.put((byte) 0x01); + }); + } + + @Test + public void testGreaterThanInitCapacityPut2() { + int cap = 10 * 1024 * 1024; + ElasticByteBuffer buf = new ElasticByteBuffer(cap, INIT_CAP); + + assertEquals(INIT_CAP, buf.physicalRemaining()); + assertEquals(cap, buf.remaining()); + assertEquals(0, buf.position()); + assertTrue(buf.hasRemaining()); + assertEquals(INIT_CAP, buf.array().length); + + byte[] randomBytes1 = RandomStringUtils.randomAlphanumeric(INIT_CAP).getBytes(); + buf.put(randomBytes1, 0, randomBytes1.length); + + assertEquals(0, buf.physicalRemaining()); + assertEquals(cap - INIT_CAP, buf.remaining()); + assertEquals(INIT_CAP, buf.position()); + assertTrue(buf.hasRemaining()); + assertEquals(INIT_CAP, buf.array().length); + + int testBytesLen1 = 5; + byte[] randomBytes2 = RandomStringUtils.randomAlphanumeric(testBytesLen1).getBytes(); + buf.put(randomBytes2, 0, randomBytes2.length); + + int expectNewPhysicalSize = INIT_CAP * 2; + + assertEquals(expectNewPhysicalSize - (INIT_CAP + testBytesLen1), buf.physicalRemaining()); + assertEquals(cap - (INIT_CAP + testBytesLen1), buf.remaining()); + assertEquals(INIT_CAP + testBytesLen1, buf.position()); + assertTrue(buf.hasRemaining()); + assertEquals(expectNewPhysicalSize, buf.array().length); + + int remainingLessOne = cap - (INIT_CAP + testBytesLen1) - 1; + byte[] randomBytes3 = RandomStringUtils.randomAlphanumeric(remainingLessOne).getBytes(); + buf.put(randomBytes3, 0, randomBytes3.length); + + assertEquals(1, buf.physicalRemaining()); + assertEquals(1, buf.remaining()); + assertEquals(cap - 1, buf.position()); + assertTrue(buf.hasRemaining()); + assertEquals(cap, buf.array().length); + + assertThrows(BufferOverflowException.class, ()->{ + buf.put(new byte[] {0x01, 0x02}, 0, 2); + }); + + buf.put(new byte[] {0x01}, 0, 1); + assertEquals(0, buf.physicalRemaining()); + assertEquals(0, buf.remaining()); + assertEquals(cap, buf.position()); + assertFalse(buf.hasRemaining()); + assertEquals(cap, buf.array().length); + + } + + @Test + public void testGreaterThanInitCapacityClear() { + int cap = 10 * 1024 * 1024; + ElasticByteBuffer buf = new ElasticByteBuffer(cap, INIT_CAP); + + byte[] randomBytes1 = RandomStringUtils.randomAlphanumeric(5 * 1024 * 1024).getBytes(); + buf.put(randomBytes1, 0, randomBytes1.length); + + byte[] arrayBeforeClear = buf.array(); + buf.clear(); + byte[] arrayAfterClear = buf.array(); + + assertEquals(0, buf.position()); + assertTrue(buf.hasRemaining()); + assertEquals(INIT_CAP, buf.physicalRemaining()); + assertEquals(cap, buf.remaining()); + + assertEquals(INIT_CAP, arrayAfterClear.length); + assertTrue(arrayAfterClear.length < arrayBeforeClear.length); + assertNotSame(arrayAfterClear, arrayBeforeClear); + } + + @Test + public void testLessThanInitSizeDataPut1() { + int cap = 1024; + ElasticByteBuffer buf = new ElasticByteBuffer(cap, INIT_CAP); + + int testBytesLen1 = 4; + String data1 = RandomStringUtils.randomAlphanumeric(testBytesLen1); + byte[] randomBytes1 = data1.getBytes(); + for (byte randomByte : randomBytes1) { + buf.put(randomByte); + } + + assertEquals(data1, new String(buf.array(), 0, buf.position())); + + int testBytesLen2 = 1020; + String data2 = RandomStringUtils.randomAlphanumeric(testBytesLen2); + byte[] randomBytes2 = data2.getBytes(); + for (byte randomByte : randomBytes2) { + buf.put(randomByte); + } + + assertEquals(data1 + data2, new String(buf.array(), 0, buf.position())); + } + + @Test + public void testLessThanInitSizeDataPut2() { + int cap = 1024; + ElasticByteBuffer buf = new ElasticByteBuffer(cap, INIT_CAP); + + int testBytesLen1 = 4; + String data1 = RandomStringUtils.randomAlphanumeric(testBytesLen1); + byte[] randomBytes1 = data1.getBytes(); + buf.put(randomBytes1, 0, randomBytes1.length); + + assertEquals(data1, new String(buf.array(), 0, buf.position())); + + int testBytesLen2 = 1020; + String data2 = RandomStringUtils.randomAlphanumeric(testBytesLen2); + byte[] randomBytes2 = data2.getBytes(); + buf.put(randomBytes2, 0, randomBytes2.length); + + assertEquals(data1 + data2, new String(buf.array(), 0, buf.position())); + } + + @Test + public void testGreaterThanInitSizeDataPut1() { + int cap = 5 * 1024 * 1024; + ElasticByteBuffer buf = new ElasticByteBuffer(cap, INIT_CAP); + + int testBytesLen1 = RandomUtils.nextInt(cap); + String data1 = RandomStringUtils.randomAlphanumeric(testBytesLen1); + byte[] randomBytes1 = data1.getBytes(); + for (byte randomByte : randomBytes1) { + buf.put(randomByte); + } + + assertEquals(data1, new String(buf.array(), 0, buf.position())); + + int testBytesLen2 = cap - testBytesLen1; + String data2 = RandomStringUtils.randomAlphanumeric(testBytesLen2); + byte[] randomBytes2 = data2.getBytes(); + for (byte randomByte : randomBytes2) { + buf.put(randomByte); + } + + assertEquals(data1 + data2, new String(buf.array(), 0, buf.position())); + } + + @Test + public void testGreaterThanInitSizeDataPut2() { + int cap = 5 * 1024 * 1024; + ElasticByteBuffer buf = new ElasticByteBuffer(cap, INIT_CAP); + + int testBytesLen1 = RandomUtils.nextInt(cap); + String data1 = RandomStringUtils.randomAlphanumeric(testBytesLen1); + byte[] randomBytes1 = data1.getBytes(); + buf.put(randomBytes1, 0, randomBytes1.length); + + assertEquals(data1, new String(buf.array(), 0, buf.position())); + + int testBytesLen2 = cap - testBytesLen1; + String data2 = RandomStringUtils.randomAlphanumeric(testBytesLen2); + byte[] randomBytes2 = data2.getBytes(); + buf.put(randomBytes2, 0, randomBytes2.length); + + assertEquals(data1 + data2, new String(buf.array(), 0, buf.position())); + } +} \ No newline at end of file diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/storage/S3StorageTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/storage/S3StorageTest.java index c19e7cec0..31f947967 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/storage/S3StorageTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/storage/S3StorageTest.java @@ -142,7 +142,7 @@ public void testUserDefinedCredentialsProvider() throws Exception { String configPrefix = S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX; localProps.put(configPrefix.concat(DummyAssertiveCredentialsProvider.ACCESS_KEY_NAME), "foo_key"); localProps.put(configPrefix.concat(DummyAssertiveCredentialsProvider.SECRET_KEY_NAME), "bar_secret"); - localProps.put(configPrefix.concat(DummyAssertiveCredentialsProvider.CONFIGS_NUM_KEY_NAME), "3"); + localProps.put(configPrefix.concat(DummyAssertiveCredentialsProvider.CONFIGS_NUM_KEY_NAME), "5"); localProps.put( S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, DummyAssertiveCredentialsProvider.class.getName() diff --git a/pom.xml b/pom.xml index 0f037e4ef..17fd96cd0 100644 --- a/pom.xml +++ b/pom.xml @@ -21,13 +21,13 @@ io.confluent kafka-connect-storage-common-parent - 11.2.4 + 11.2.5 io.confluent kafka-connect-storage-cloud pom - 10.0.28-SNAPSHOT + 10.5.8-SNAPSHOT kafka-connect-storage-cloud Confluent, Inc. @@ -50,7 +50,7 @@ scm:git:git://github.com/confluentinc/kafka-connect-storage-cloud.git scm:git:git@github.com:confluentinc/kafka-connect-storage-cloud.git https://github.com/confluentinc/kafka-connect-storage-cloud - 10.0.x + 10.5.x @@ -169,6 +169,12 @@ jettison ${jettison.version} + + + commons-lang + commons-lang + 2.6 + com.fasterxml.woodstox woodstox-core @@ -340,9 +346,14 @@ junit test + + org.junit.vintage + junit-vintage-engine + test + org.junit.jupiter - junit-jupiter-engine + junit-jupiter-api test @@ -443,4 +454,4 @@ - + \ No newline at end of file