Skip to content

Make GC work with EMR 7.0.0 #9013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 96 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
2b4eb7c
Test GC for EMR 7.0.0
idanovo May 4, 2025
0a811fc
WIP
idanovo May 4, 2025
0883db7
WIP
idanovo May 4, 2025
c5fe3b7
WIP
idanovo May 5, 2025
8dba842
WIP
idanovo May 5, 2025
8742b36
WIP
idanovo May 5, 2025
867b44b
WIP
idanovo May 5, 2025
f053195
removed unused imports
idanovo May 5, 2025
c6cf6a3
WIP
idanovo May 5, 2025
ef04b3a
WIP
idanovo May 5, 2025
1125925
WIP
idanovo May 5, 2025
4e7c055
WIP
idanovo May 5, 2025
2a12d48
Fix
idanovo May 5, 2025
bbe0927
Fix
idanovo May 5, 2025
689ce97
Fix
idanovo May 5, 2025
476e7bb
Fix
idanovo May 5, 2025
5d34dec
Fix
idanovo May 5, 2025
3f0aac1
Fix
idanovo May 6, 2025
b257af7
Fix
idanovo May 6, 2025
b26632b
Fix
idanovo May 6, 2025
3279cfb
Fix
idanovo May 6, 2025
5a42aea
Fix
idanovo May 6, 2025
7188598
Fix
idanovo May 6, 2025
b7b2891
WIP
idanovo May 8, 2025
63aaf68
WIP
idanovo May 8, 2025
4ae55b3
Merge branch 'master' of https://github.com/treeverse/lakeFS into spa…
idanovo May 11, 2025
3afd435
WIP
idanovo May 11, 2025
a03179f
WIP
idanovo May 11, 2025
62f28db
WIP
idanovo May 11, 2025
bd7f9e3
WIP
idanovo May 11, 2025
fe00178
WIP
idanovo May 11, 2025
b17d0b0
WIP
idanovo May 11, 2025
91c6faa
WIP
idanovo May 11, 2025
f82a3aa
WIP
idanovo May 11, 2025
ebe8a62
WIP
idanovo May 11, 2025
8526186
WIP
idanovo May 11, 2025
4184cf0
Fix tests
idanovo May 11, 2025
bfc34d6
Fix tests
idanovo May 11, 2025
eb2d3d4
Fix tests
idanovo May 11, 2025
bb55c31
Fix tests
idanovo May 11, 2025
045b1a3
WIP
idanovo May 11, 2025
f758199
Fix tests
idanovo May 11, 2025
b9eb652
Fix
idanovo May 11, 2025
d648480
Fix
idanovo May 11, 2025
b12b32f
Fix
idanovo May 11, 2025
bdd3222
Test
idanovo May 11, 2025
bf900dc
Test
idanovo May 11, 2025
2858223
Test
idanovo May 11, 2025
51a03e7
Revert
idanovo May 11, 2025
bb4bb10
Test
idanovo May 11, 2025
fa7bc5a
Revert
idanovo May 11, 2025
03a0be3
Test
idanovo May 11, 2025
4707004
Revert test
idanovo May 11, 2025
d94a4b0
Merge branch 'master' of https://github.com/treeverse/lakeFS into spa…
idanovo May 12, 2025
128b355
Test
idanovo May 12, 2025
6c631dd
Test
idanovo May 12, 2025
84ae17b
Test
idanovo May 12, 2025
4026128
Test
idanovo May 12, 2025
da64484
Test
idanovo May 12, 2025
6c83598
Test
idanovo May 12, 2025
a76f231
Test
idanovo May 12, 2025
2eb2275
Test
idanovo May 12, 2025
4bf02e1
Work
idanovo May 12, 2025
d76b199
Fix
idanovo May 13, 2025
5e26f65
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 13, 2025
19c8467
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 13, 2025
fd46fd0
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 13, 2025
473d09d
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 13, 2025
6daf089
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 13, 2025
40ccf5d
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 13, 2025
30a9973
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 13, 2025
cf65eb1
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 13, 2025
6365eac
Skip tests
idanovo May 13, 2025
8c28317
Skip tests
idanovo May 13, 2025
e46bb6f
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 13, 2025
4ce21bf
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 13, 2025
83215f6
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 14, 2025
e27db78
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 14, 2025
48389b8
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 14, 2025
1060a68
Merge branch 'master' of https://github.com/treeverse/lakeFS into spa…
idanovo May 14, 2025
1dbf653
Trying support both EMR 6.9.0 and 7.0.0
idanovo May 14, 2025
3c87229
Merge branch 'master' of https://github.com/treeverse/lakeFS into spa…
idanovo May 14, 2025
04b792a
Pull from master and reverted to a version that works only for EMR 7.0.0
idanovo May 14, 2025
ba732d3
Fix
idanovo May 14, 2025
467d198
Removed unused import
idanovo May 14, 2025
dc57c79
Add logs
idanovo May 15, 2025
b48df4c
Revert fix
idanovo May 15, 2025
4a1de2b
Test
idanovo May 15, 2025
0fa17d4
Test
idanovo May 15, 2025
bdf7b44
Revert test
idanovo May 15, 2025
6a0409d
Merge branch 'master' of https://github.com/treeverse/lakeFS into spa…
idanovo May 15, 2025
54cfc27
Fix
idanovo May 15, 2025
4174480
test
idanovo May 18, 2025
fb24bff
Remove test changes
idanovo May 18, 2025
be6e8ec
remove log
idanovo May 18, 2025
0ea188b
Skip json files
idanovo May 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions clients/spark/build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
lazy val projectVersion = "0.14.2"
lazy val projectVersion = "0.15.1-support-emr-7.0.0"
version := projectVersion
lazy val hadoopVersion = "3.2.1"
lazy val hadoopVersion = "3.3.6"
ThisBuild / isSnapshot := false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this bump (hadoop + aws-java-sdk-bundle) backward compatible? was it tested?
i.e in hadoop 3.2.1 and aws-sdk 1.12.194?
backward compatibility is a hard requirement right, we mentioned breaking compatability as a long-term work with progress.

ThisBuild / scalaVersion := "2.12.12"

Expand Down Expand Up @@ -55,7 +55,7 @@ libraryDependencies ++= Seq(
"com.azure" % "azure-storage-blob" % "12.9.0",
"com.azure" % "azure-storage-blob-batch" % "12.7.0",
"com.azure" % "azure-identity" % "1.2.0",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.194" % "provided",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.367" % "provided",
// Snappy is JNI :-(. However it does claim to work with
// ClassLoaders, and (even more importantly!) using a preloaded JNI
// version will probably continue to work because the C language API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,28 @@ class EntryRecordReader[Proto <: GeneratedMessage with scalapb.Message[Proto]](

val gravelerSplit = split.asInstanceOf[GravelerSplit]

// Log the path before processing
logger.info(s"Processing file: ${gravelerSplit.path}")

val fs = gravelerSplit.path.getFileSystem(context.getConfiguration)
fs.copyToLocalFile(false, gravelerSplit.path, new Path(localFile.getAbsolutePath), true)
// TODO(johnnyaug) should we cache this?
sstableReader = new SSTableReader(localFile.getAbsolutePath, companion, true)
if (!gravelerSplit.isValidated) {
// this file may not be a valid range file, validate it
val props = sstableReader.getProperties
logger.debug(s"Props: $props")
if (new String(props("type")) != "ranges" || props.contains("entity")) {
return
try {
val props = sstableReader.getProperties
logger.debug(s"Props: $props")
if (new String(props("type")) != "ranges" || props.contains("entity")) {
return
}
} catch {
case e: io.treeverse.jpebble.BadFileFormatException =>
logger.error(s"Failed to read sstable, bad file format: ${gravelerSplit.path}", e)
throw new io.treeverse.jpebble.BadFileFormatException(
s"Bad file format in ${gravelerSplit.path}: ${e.getMessage}",
e
)
}
}
rangeID = gravelerSplit.rangeID
Expand Down Expand Up @@ -259,8 +271,8 @@ class LakeFSAllRangesInputFormat extends LakeFSBaseInputFormat {
while (it.hasNext) {
val file = it.next()
breakable {
if (file.getPath.getName == DummyFileName) {
logger.debug(s"Skipping dummy file ${file.getPath}")
if (file.getPath.getName == DummyFileName || file.getPath.getName.endsWith(".json")) {
logger.debug(s"Skipping file ${file.getPath}")
break
}
splits += new GravelerSplit(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.treeverse.clients

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.{
AWSCredentialsProvider,
DefaultAWSCredentialsProviderChain,
STSAssumeRoleSessionCredentialsProvider
}
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.retry.PredefinedRetryPolicies.SDKDefaultRetryCondition
import com.amazonaws.retry.RetryUtils
Expand All @@ -11,6 +15,7 @@ import org.slf4j.{Logger, LoggerFactory}

import java.net.URI
import java.util.concurrent.TimeUnit
import java.util.UUID

object StorageUtils {
val StorageTypeS3 = "s3"
Expand Down Expand Up @@ -143,10 +148,34 @@ object StorageUtils {
builder.withRegion(region)
else
builder
val builderWithCredentials = credentialsProvider match {
case Some(cp) => builderWithEndpoint.withCredentials(cp)
case None => builderWithEndpoint
}

// Check for Hadoop's assumed role configuration
val roleArn = System.getProperty("spark.hadoop.fs.s3a.assumed.role.arn")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move to consts file we have


// Apply credentials based on configuration
val builderWithCredentials =
if (roleArn != null && !roleArn.isEmpty) {
// If we have a role ARN configured, assume that role
try {
val sessionName = "lakefs-gc-" + UUID.randomUUID().toString
val stsProvider =
new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, sessionName)
.withLongLivedCredentialsProvider(new DefaultAWSCredentialsProviderChain())
.build()

builderWithEndpoint.withCredentials(stsProvider)
} catch {
case e: Exception =>
logger.info("Falling back to DefaultAWSCredentialsProviderChain")
builderWithEndpoint.withCredentials(new DefaultAWSCredentialsProviderChain())
}
} else
(
// Use standard AWSCredentialsProvider if available
builderWithEndpoint.withCredentials(
credentialsProvider.get.asInstanceOf[AWSCredentialsProvider]
)
)
builderWithCredentials.build
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ import scala.collection.JavaConverters._

import scala.collection.mutable
import org.scalatest.OneInstancePerTest
import org.checkerframework.checker.units.qual.m
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.LocatedFileStatus
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.BlockLocation
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.RemoteIterator
import org.apache.hadoop.fs.BatchedRemoteIterator

object LakeFSInputFormatSpec {
def getItem(rangeID: String): Item[RangeData] = new Item(
Expand Down
Loading