Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions clients/spark/build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
lazy val projectVersion = "0.15.0"
lazy val projectVersion = "0.15.2"
version := projectVersion
lazy val hadoopVersion = "3.2.1"
lazy val hadoopVersion = sys.props.getOrElse("hadoopVersion", "3.3.6")
ThisBuild / isSnapshot := false
ThisBuild / scalaVersion := "2.12.12"

Expand Down Expand Up @@ -63,7 +63,7 @@ libraryDependencies ++= Seq(
"com.azure" % "azure-storage-blob" % "12.9.0",
"com.azure" % "azure-storage-blob-batch" % "12.7.0",
"com.azure" % "azure-identity" % "1.2.0",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.194" % "provided",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.569" % "provided",
"com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop3-2.2.18",
"com.google.cloud" % "google-cloud-storage" % "2.35.0",
// Snappy is JNI :-(. However it does claim to work with
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.treeverse.clients

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.{
AWSCredentialsProvider,
DefaultAWSCredentialsProviderChain,
STSAssumeRoleSessionCredentialsProvider
}
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.retry.PredefinedRetryPolicies.SDKDefaultRetryCondition
import com.amazonaws.retry.RetryUtils
Expand All @@ -11,6 +15,7 @@ import org.slf4j.{Logger, LoggerFactory}

import java.net.URI
import java.util.concurrent.TimeUnit
import java.util.UUID

object StorageUtils {
val StorageTypeS3 = "s3"
Expand All @@ -26,10 +31,10 @@ object StorageUtils {
* @return object paths in a storage namespace
*/
def concatKeysToStorageNamespace(
keys: Seq[String],
storageNamespace: String,
keepNsSchemeAndHost: Boolean = true
): Seq[String] = {
keys: Seq[String],
storageNamespace: String,
keepNsSchemeAndHost: Boolean = true
): Seq[String] = {
var sanitizedNS = storageNamespace
if (!keepNsSchemeAndHost) {
val uri = new URI(storageNamespace)
Expand Down Expand Up @@ -91,17 +96,20 @@ object StorageUtils {
val logger: Logger = LoggerFactory.getLogger(getClass.toString)

def createAndValidateS3Client(
configuration: ClientConfiguration,
credentialsProvider: Option[AWSCredentialsProvider],
awsS3ClientBuilder: AmazonS3ClientBuilder,
endpoint: String,
region: String,
bucket: String
): AmazonS3 = {
configuration: ClientConfiguration,
credentialsProvider: Option[AWSCredentialsProvider],
awsS3ClientBuilder: AmazonS3ClientBuilder,
endpoint: String,
region: String,
bucket: String
): AmazonS3 = {
logger.debug(
s"Creating S3 client for bucket: $bucket, region: $region, endpoint: $endpoint"
)
require(awsS3ClientBuilder != null)
require(bucket.nonEmpty)
val client =
initializeS3Client(configuration, credentialsProvider, awsS3ClientBuilder, endpoint)
initializeS3Client(configuration, credentialsProvider, awsS3ClientBuilder, endpoint, region)
var bucketRegion =
try {
getAWSS3Region(client, bucket)
Expand Down Expand Up @@ -142,12 +150,43 @@ object StorageUtils {
)
else if (region != null)
builder.withRegion(region)
else
builder
val builderWithCredentials = credentialsProvider match {
case Some(cp) => builderWithEndpoint.withCredentials(cp)
case None => builderWithEndpoint
}
else {
// Fall back to default region provider chain
val currentRegion = com.amazonaws.regions.Regions.getCurrentRegion
if (currentRegion != null) {
builder.withRegion(currentRegion.getName)
} else {
builder.withRegion(com.amazonaws.regions.Regions.US_EAST_1)
}
}

// Check for Hadoop's assumed role configuration
val roleArn = System.getProperty("spark.hadoop.fs.s3a.assumed.role.arn")

// Apply credentials based on configuration
val builderWithCredentials =
if (roleArn != null && !roleArn.isEmpty) {
// If we have a role ARN configured, assume that role
try {
val sessionName = "lakefs-gc-" + UUID.randomUUID().toString
val stsProvider =
new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, sessionName)
.withLongLivedCredentialsProvider(new DefaultAWSCredentialsProviderChain())
.build()

builderWithEndpoint.withCredentials(stsProvider)
} catch {
case e: Exception =>
logger.info("Falling back to DefaultAWSCredentialsProviderChain")
builderWithEndpoint.withCredentials(new DefaultAWSCredentialsProviderChain())
}
} else
(
// Use standard AWSCredentialsProvider if available
builderWithEndpoint.withCredentials(
credentialsProvider.get.asInstanceOf[AWSCredentialsProvider]
)
)
builderWithCredentials.build
}

Expand All @@ -172,10 +211,10 @@ class S3RetryDeleteObjectsCondition extends SDKDefaultRetryCondition {
private val clock = java.time.Clock.systemDefaultZone

override def shouldRetry(
originalRequest: AmazonWebServiceRequest,
exception: AmazonClientException,
retriesAttempted: Int
): Boolean = {
originalRequest: AmazonWebServiceRequest,
exception: AmazonClientException,
retriesAttempted: Int
): Boolean = {
val now = clock.instant
exception match {
case ce: SdkClientException =>
Expand Down
Loading