diff --git a/clients/spark/build.sbt b/clients/spark/build.sbt index 66b41079a9a..ab8116fd12f 100644 --- a/clients/spark/build.sbt +++ b/clients/spark/build.sbt @@ -1,6 +1,6 @@ -lazy val projectVersion = "0.14.2" +lazy val projectVersion = "0.15.1-support-emr-7.0.0" version := projectVersion -lazy val hadoopVersion = "3.2.1" +lazy val hadoopVersion = "3.3.6" ThisBuild / isSnapshot := false ThisBuild / scalaVersion := "2.12.12" @@ -55,7 +55,7 @@ libraryDependencies ++= Seq( "com.azure" % "azure-storage-blob" % "12.9.0", "com.azure" % "azure-storage-blob-batch" % "12.7.0", "com.azure" % "azure-identity" % "1.2.0", - "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.194" % "provided", + "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.367" % "provided", // Snappy is JNI :-(. However it does claim to work with // ClassLoaders, and (even more importantly!) using a preloaded JNI // version will probably continue to work because the C language API diff --git a/clients/spark/src/main/scala/io/treeverse/clients/LakeFSInputFormat.scala b/clients/spark/src/main/scala/io/treeverse/clients/LakeFSInputFormat.scala index 5a94ee2b298..5abbd10c792 100644 --- a/clients/spark/src/main/scala/io/treeverse/clients/LakeFSInputFormat.scala +++ b/clients/spark/src/main/scala/io/treeverse/clients/LakeFSInputFormat.scala @@ -99,16 +99,28 @@ class EntryRecordReader[Proto <: GeneratedMessage with scalapb.Message[Proto]]( val gravelerSplit = split.asInstanceOf[GravelerSplit] + // Log the path before processing + logger.info(s"Processing file: ${gravelerSplit.path}") + val fs = gravelerSplit.path.getFileSystem(context.getConfiguration) fs.copyToLocalFile(false, gravelerSplit.path, new Path(localFile.getAbsolutePath), true) // TODO(johnnyaug) should we cache this? sstableReader = new SSTableReader(localFile.getAbsolutePath, companion, true) if (!gravelerSplit.isValidated) { // this file may not be a valid range file, validate it - val props = sstableReader.getProperties - logger.debug(s"Props: $props") - if (new String(props("type")) != "ranges" || props.contains("entity")) { - return + try { + val props = sstableReader.getProperties + logger.debug(s"Props: $props") + if (new String(props("type")) != "ranges" || props.contains("entity")) { + return + } + } catch { + case e: io.treeverse.jpebble.BadFileFormatException => + logger.error(s"Failed to read sstable, bad file format: ${gravelerSplit.path}", e) + throw new io.treeverse.jpebble.BadFileFormatException( + s"Bad file format in ${gravelerSplit.path}: ${e.getMessage}", + e + ) } } rangeID = gravelerSplit.rangeID @@ -259,8 +271,8 @@ class LakeFSAllRangesInputFormat extends LakeFSBaseInputFormat { while (it.hasNext) { val file = it.next() breakable { - if (file.getPath.getName == DummyFileName) { - logger.debug(s"Skipping dummy file ${file.getPath}") + if (file.getPath.getName == DummyFileName || file.getPath.getName.endsWith(".json")) { + logger.debug(s"Skipping file ${file.getPath}") break } splits += new GravelerSplit( diff --git a/clients/spark/src/main/scala/io/treeverse/clients/StorageUtils.scala b/clients/spark/src/main/scala/io/treeverse/clients/StorageUtils.scala index c360c6a53b4..392c8bf87a5 100644 --- a/clients/spark/src/main/scala/io/treeverse/clients/StorageUtils.scala +++ b/clients/spark/src/main/scala/io/treeverse/clients/StorageUtils.scala @@ -1,6 +1,10 @@ package io.treeverse.clients -import com.amazonaws.auth.AWSCredentialsProvider +import com.amazonaws.auth.{ + AWSCredentialsProvider, + DefaultAWSCredentialsProviderChain, + STSAssumeRoleSessionCredentialsProvider +} import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.retry.PredefinedRetryPolicies.SDKDefaultRetryCondition import com.amazonaws.retry.RetryUtils @@ -11,6 +15,7 @@ import org.slf4j.{Logger, LoggerFactory} import java.net.URI import java.util.concurrent.TimeUnit +import java.util.UUID object StorageUtils { val StorageTypeS3 = "s3" @@ -143,10 +148,34 @@ object StorageUtils { builder.withRegion(region) else builder - val builderWithCredentials = credentialsProvider match { - case Some(cp) => builderWithEndpoint.withCredentials(cp) - case None => builderWithEndpoint - } + + // Check for Hadoop's assumed role configuration + val roleArn = System.getProperty("spark.hadoop.fs.s3a.assumed.role.arn") + + // Apply credentials based on configuration + val builderWithCredentials = + if (roleArn != null && !roleArn.isEmpty) { + // If we have a role ARN configured, assume that role + try { + val sessionName = "lakefs-gc-" + UUID.randomUUID().toString + val stsProvider = + new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, sessionName) + .withLongLivedCredentialsProvider(new DefaultAWSCredentialsProviderChain()) + .build() + + builderWithEndpoint.withCredentials(stsProvider) + } catch { + case e: Exception => + logger.info("Falling back to DefaultAWSCredentialsProviderChain") + builderWithEndpoint.withCredentials(new DefaultAWSCredentialsProviderChain()) + } + } else + ( + // Use standard AWSCredentialsProvider if available + builderWithEndpoint.withCredentials( + credentialsProvider.get.asInstanceOf[AWSCredentialsProvider] + ) + ) builderWithCredentials.build } diff --git a/clients/spark/src/test/scala/io/treeverse/clients/LakeFSInputFormatSpec.scala b/clients/spark/src/test/scala/io/treeverse/clients/LakeFSInputFormatSpec.scala index b56c70af9d7..dbcf5e78e81 100644 --- a/clients/spark/src/test/scala/io/treeverse/clients/LakeFSInputFormatSpec.scala +++ b/clients/spark/src/test/scala/io/treeverse/clients/LakeFSInputFormatSpec.scala @@ -13,14 +13,12 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.scalatest.OneInstancePerTest -import org.checkerframework.checker.units.qual.m import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.LocatedFileStatus import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.BlockLocation import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.fs.RemoteIterator -import org.apache.hadoop.fs.BatchedRemoteIterator object LakeFSInputFormatSpec { def getItem(rangeID: String): Item[RangeData] = new Item(