diff --git a/clients/spark/build.sbt b/clients/spark/build.sbt index a098f1ec857..00e25e93076 100644 --- a/clients/spark/build.sbt +++ b/clients/spark/build.sbt @@ -1,6 +1,6 @@ -lazy val projectVersion = "0.15.0" +lazy val projectVersion = "0.15.2" version := projectVersion -lazy val hadoopVersion = "3.2.1" +lazy val hadoopVersion = sys.props.getOrElse("hadoopVersion", "3.3.6") ThisBuild / isSnapshot := false ThisBuild / scalaVersion := "2.12.12" @@ -63,7 +63,7 @@ libraryDependencies ++= Seq( "com.azure" % "azure-storage-blob" % "12.9.0", "com.azure" % "azure-storage-blob-batch" % "12.7.0", "com.azure" % "azure-identity" % "1.2.0", - "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.194" % "provided", + "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.569" % "provided", "com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop3-2.2.18", "com.google.cloud" % "google-cloud-storage" % "2.35.0", // Snappy is JNI :-(. However it does claim to work with diff --git a/clients/spark/src/main/scala/io/treeverse/clients/StorageUtils.scala b/clients/spark/src/main/scala/io/treeverse/clients/StorageUtils.scala index 209d520e9e0..9b8fa2afc50 100644 --- a/clients/spark/src/main/scala/io/treeverse/clients/StorageUtils.scala +++ b/clients/spark/src/main/scala/io/treeverse/clients/StorageUtils.scala @@ -1,6 +1,10 @@ package io.treeverse.clients -import com.amazonaws.auth.AWSCredentialsProvider +import com.amazonaws.auth.{ + AWSCredentialsProvider, + DefaultAWSCredentialsProviderChain, + STSAssumeRoleSessionCredentialsProvider +} import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.retry.PredefinedRetryPolicies.SDKDefaultRetryCondition import com.amazonaws.retry.RetryUtils @@ -11,6 +15,7 @@ import org.slf4j.{Logger, LoggerFactory} import java.net.URI import java.util.concurrent.TimeUnit +import java.util.UUID object StorageUtils { val StorageTypeS3 = "s3" @@ -26,10 +31,10 @@ object StorageUtils { * @return object paths in a storage namespace */ def concatKeysToStorageNamespace( - keys: Seq[String], - storageNamespace: String, - keepNsSchemeAndHost: Boolean = true - ): Seq[String] = { + keys: Seq[String], + storageNamespace: String, + keepNsSchemeAndHost: Boolean = true + ): Seq[String] = { var sanitizedNS = storageNamespace if (!keepNsSchemeAndHost) { val uri = new URI(storageNamespace) @@ -91,17 +96,20 @@ object StorageUtils { val logger: Logger = LoggerFactory.getLogger(getClass.toString) def createAndValidateS3Client( - configuration: ClientConfiguration, - credentialsProvider: Option[AWSCredentialsProvider], - awsS3ClientBuilder: AmazonS3ClientBuilder, - endpoint: String, - region: String, - bucket: String - ): AmazonS3 = { + configuration: ClientConfiguration, + credentialsProvider: Option[AWSCredentialsProvider], + awsS3ClientBuilder: AmazonS3ClientBuilder, + endpoint: String, + region: String, + bucket: String + ): AmazonS3 = { + logger.debug( + s"Creating S3 client for bucket: $bucket, region: $region, endpoint: $endpoint" + ) require(awsS3ClientBuilder != null) require(bucket.nonEmpty) val client = - initializeS3Client(configuration, credentialsProvider, awsS3ClientBuilder, endpoint) + initializeS3Client(configuration, credentialsProvider, awsS3ClientBuilder, endpoint, region) var bucketRegion = try { getAWSS3Region(client, bucket) @@ -142,12 +150,43 @@ object StorageUtils { ) else if (region != null) builder.withRegion(region) - else - builder - val builderWithCredentials = credentialsProvider match { - case Some(cp) => builderWithEndpoint.withCredentials(cp) - case None => builderWithEndpoint - } + else { + // Fall back to default region provider chain + val currentRegion = com.amazonaws.regions.Regions.getCurrentRegion + if (currentRegion != null) { + builder.withRegion(currentRegion.getName) + } else { + builder.withRegion(com.amazonaws.regions.Regions.US_EAST_1) + } + } + + // Check for Hadoop's assumed role configuration + val roleArn = System.getProperty("spark.hadoop.fs.s3a.assumed.role.arn") + + // Apply credentials based on configuration + val builderWithCredentials = + if (roleArn != null && !roleArn.isEmpty) { + // If we have a role ARN configured, assume that role + try { + val sessionName = "lakefs-gc-" + UUID.randomUUID().toString + val stsProvider = + new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, sessionName) + .withLongLivedCredentialsProvider(new DefaultAWSCredentialsProviderChain()) + .build() + + builderWithEndpoint.withCredentials(stsProvider) + } catch { + case e: Exception => + logger.info("Falling back to DefaultAWSCredentialsProviderChain") + builderWithEndpoint.withCredentials(new DefaultAWSCredentialsProviderChain()) + } + } else + ( + // Use standard AWSCredentialsProvider if available + builderWithEndpoint.withCredentials( + credentialsProvider.get.asInstanceOf[AWSCredentialsProvider] + ) + ) builderWithCredentials.build } @@ -172,10 +211,10 @@ class S3RetryDeleteObjectsCondition extends SDKDefaultRetryCondition { private val clock = java.time.Clock.systemDefaultZone override def shouldRetry( - originalRequest: AmazonWebServiceRequest, - exception: AmazonClientException, - retriesAttempted: Int - ): Boolean = { + originalRequest: AmazonWebServiceRequest, + exception: AmazonClientException, + retriesAttempted: Int + ): Boolean = { val now = clock.instant exception match { case ce: SdkClientException =>