diff --git a/kafka-connect-s3/pom.xml b/kafka-connect-s3/pom.xml index 6ee83a341..517e29838 100644 --- a/kafka-connect-s3/pom.xml +++ b/kafka-connect-s3/pom.xml @@ -33,6 +33,7 @@ + 2.20.133 1.12.524 0.2.5 0.11.1 @@ -83,6 +84,11 @@ httpclient ${httpclient.version} + + software.amazon.awssdk + sts + ${aws.sdk.version} + org.xerial.snappy @@ -180,7 +186,6 @@ test - diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index a537e73f4..c8f63e9af 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -17,14 +17,13 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.SSEAlgorithm; -import io.confluent.connect.storage.common.util.StringUtils; +import io.confluent.connect.s3.auth.iamAssume.AwsIAMAssumeRoleChaining; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -62,6 +61,7 @@ import io.confluent.connect.s3.format.parquet.ParquetFormat; import io.confluent.connect.s3.storage.CompressionType; import io.confluent.connect.s3.storage.S3Storage; +import io.confluent.connect.storage.common.util.StringUtils; import io.confluent.connect.storage.StorageSinkConnectorConfig; import io.confluent.connect.storage.common.ComposableConfig; import io.confluent.connect.storage.common.GenericRecommender; @@ -118,12 +118,21 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { CREDENTIALS_PROVIDER_CLASS_CONFIG.lastIndexOf(".") + 1 ); + public static final String AUTH_METHOD = "authentication.method"; + public static final String AWS_AUTH_DEFAULT = "Access key and secret"; public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id"; public static final String AWS_ACCESS_KEY_ID_DEFAULT = ""; public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key"; public static final Password AWS_SECRET_ACCESS_KEY_DEFAULT = new Password(null); + public static final String CUSTOMER_ROLE_ARN_CONFIG = "aws.iam.assume.role"; + public static final String CUSTOMER_ROLE_ARN_DEFAULT = ""; + public static final String CUSTOMER_ROLE_EXTERNAL_ID_CONFIG = "aws.iam.external.id"; + public static final Password CUSTOMER_ROLE_EXTERNAL_ID_DEFAULT = new Password(null); + public static final String MIDDLEWARE_ROLE_ARN_CONFIG = "middleware.aws.iam.assume.role"; + public static final String MIDDLEWARE_ROLE_ARN_DEFAULT = ""; + public static final String REGION_CONFIG = "s3.region"; public static final String REGION_DEFAULT = Regions.DEFAULT_REGION.getName(); @@ -375,6 +384,54 @@ public static ConfigDef newConfigDef() { "AWS Credentials Provider Class" ); + configDef.define( + AUTH_METHOD, + Type.STRING, + AWS_AUTH_DEFAULT, + Importance.HIGH, + "Authentication method used for S3 Sink connector", + group, + ++orderInGroup, + Width.LONG, + "Authentication method" + ); + + configDef.define( + CUSTOMER_ROLE_ARN_CONFIG, + Type.STRING, + CUSTOMER_ROLE_ARN_DEFAULT, + Importance.HIGH, + "Role ARN", + group, + ++orderInGroup, + Width.LONG, + "AWS Role ARN" + ); + + configDef.define( + CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, + Type.PASSWORD, + CUSTOMER_ROLE_EXTERNAL_ID_DEFAULT, + Importance.HIGH, + "External ID", + group, + ++orderInGroup, + Width.LONG, + "AWS External ID" + ); + + configDef.define( + MIDDLEWARE_ROLE_ARN_CONFIG, + Type.STRING, + MIDDLEWARE_ROLE_ARN_DEFAULT, + Importance.HIGH, + "Role ARN", + group, + ++orderInGroup, + Width.LONG, + "AWS Role ARN" + ); + configDef.define( AWS_ACCESS_KEY_ID_CONFIG, Type.STRING, @@ -878,26 +935,52 @@ public Password awsSecretKeyId() { return getPassword(AWS_SECRET_ACCESS_KEY_CONFIG); } + public String awsCustomerRoleARN() { + return getString(CUSTOMER_ROLE_ARN_CONFIG); + } + + public String awsMiddlewareRoleARN() { + return getString(MIDDLEWARE_ROLE_ARN_CONFIG); + } + + public String awsExternalId() { + return getString(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG); + } + public int getPartSize() { return getInt(PART_SIZE_CONFIG); } + public String getAuthenticationMethod() { + return getString(AUTH_METHOD); + } + @SuppressWarnings("unchecked") public AWSCredentialsProvider getCredentialsProvider() { try { AWSCredentialsProvider provider = ((Class) - getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance(); + getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance(); if (provider instanceof Configurable) { Map configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX); configs.remove(CREDENTIALS_PROVIDER_CLASS_CONFIG.substring( - CREDENTIALS_PROVIDER_CONFIG_PREFIX.length() + CREDENTIALS_PROVIDER_CONFIG_PREFIX.length() )); - configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId()); - configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value()); + String authMethod = getAuthenticationMethod(); + if (authMethod == "IAM Assume Role") { + configs.put(CUSTOMER_ROLE_ARN_CONFIG, awsCustomerRoleARN()); + configs.put(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, awsExternalId()); + configs.put(MIDDLEWARE_ROLE_ARN_CONFIG, awsMiddlewareRoleARN()); - ((Configurable) provider).configure(configs); + provider = new AwsIAMAssumeRoleChaining(); + ((AwsIAMAssumeRoleChaining) provider).configure(configs); + } + else { + configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId()); + configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value()); + ((Configurable) provider).configure(configs); + } } else { final String accessKeyId = awsAccessKeyId(); final String secretKey = awsSecretKeyId().value(); @@ -910,8 +993,8 @@ public AWSCredentialsProvider getCredentialsProvider() { return provider; } catch (IllegalAccessException | InstantiationException e) { throw new ConnectException( - "Invalid class for: " + S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, - e + "Invalid class for: " + S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + e ); } } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamAssume/AwsIAMAssumeRoleChaining.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamAssume/AwsIAMAssumeRoleChaining.java new file mode 100644 index 000000000..ecc17f05a --- /dev/null +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamAssume/AwsIAMAssumeRoleChaining.java @@ -0,0 +1,99 @@ +package io.confluent.connect.s3.auth.iamAssume; + +import com.amazonaws.auth.*; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.services.sts.model.Tag; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_ARN_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_EXTERNAL_ID_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.MIDDLEWARE_ROLE_ARN_CONFIG; +import java.util.Map; + +public class AwsIAMAssumeRoleChaining implements AWSCredentialsProvider { + + private static final ConfigDef STS_CONFIG_DEF = new ConfigDef() + .define( + CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.Importance.MEDIUM, + "The role external ID used when retrieving session credentials under an assumed role." + ).define( + CUSTOMER_ROLE_ARN_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "Role ARN to use when starting a session." + ).define( + MIDDLEWARE_ROLE_ARN_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "Role ARN to use when starting a session." + ); + + private String customerRoleArn; + private String customerRoleExternalId; + private String middlewareRoleArn; + private STSAssumeRoleSessionCredentialsProvider stsCredentialProvider; + + // Method to initiate role chaining + public void configure(Map configs) { + // Assume the initial role + AbstractConfig config = new AbstractConfig(STS_CONFIG_DEF, configs); + customerRoleArn = config.getString(CUSTOMER_ROLE_ARN_CONFIG); + customerRoleExternalId = config.getString(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG); + middlewareRoleArn = config.getString(MIDDLEWARE_ROLE_ARN_CONFIG); + + STSAssumeRoleSessionCredentialsProvider initialProvider = buildProvider(middlewareRoleArn, "middlewareSession", "", null); + + // Use the credentials from the initial role to assume the subsequent role + stsCredentialProvider = buildProvider(customerRoleArn, "customerSession", customerRoleExternalId, initialProvider); + } + + // Updated buildProvider to optionally accept an existing AwsCredentialsProvider + private STSAssumeRoleSessionCredentialsProvider buildProvider(final String roleArn, final String roleSessionName, final String roleExternalId, STSAssumeRoleSessionCredentialsProvider existingProvider) { + STSAssumeRoleSessionCredentialsProvider credentialsProvider; + // If an existing credentials provider is provided, use it for creating the STS client + if (existingProvider != null) { + AWSCredentials basicCredentials = existingProvider.getCredentials(); + credentialsProvider = new STSAssumeRoleSessionCredentialsProvider + .Builder(roleArn, roleSessionName) + .withStsClient(AWSSecurityTokenServiceClientBuilder + .standard() + .withCredentials(new AWSStaticCredentialsProvider(basicCredentials)).build() + ) + .withExternalId(roleExternalId) + .build(); + } else { + credentialsProvider = new STSAssumeRoleSessionCredentialsProvider + .Builder(roleArn, roleSessionName) + .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) + .withExternalId(roleExternalId) + .build(); + } + return credentialsProvider; + } + + // Helper method to build the AssumeRoleRequest + private AssumeRoleRequest buildRequest(String roleExternalId, String roleArn) { + return AssumeRoleRequest.builder() + .roleArn(roleArn) + .externalId(roleExternalId) + .build(); + } + + @Override + public AWSCredentials getCredentials() { + return stsCredentialProvider.getCredentials(); + } + + @Override + public void refresh() { + stsCredentialProvider.refresh(); + } +}