Skip to content

Commit

Permalink
[SPARK-43195][CORE] Remove unnecessary serializable wrapper in Hadoop…
Browse files Browse the repository at this point in the history
…FSUtils

### What changes were proposed in this pull request?

Remove unnecessary serializable wrapper in `HadoopFSUtils`.

### Why are the changes needed?

`Path`, `FileStatus` become serializable in Hadoop3, since SPARK-42452 removed support for Hadoop2, we can remove those wrapper now.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass CI.

Closes apache#40854 from pan3793/SPARK-43195.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
  • Loading branch information
pan3793 authored and sunchao committed Apr 20, 2023
1 parent 3814d15 commit 29b40f6
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,6 @@ private[serializer] object KryoSerializer {
// SQL / ML / MLlib classes once and then re-use that filtered list in newInstance() calls.
private lazy val loadableSparkClasses: Seq[Class[_]] = {
Seq(
"org.apache.spark.util.HadoopFSUtils$SerializableBlockLocation",
"[Lorg.apache.spark.util.HadoopFSUtils$SerializableBlockLocation;",
"org.apache.spark.util.HadoopFSUtils$SerializableFileStatus",

"org.apache.spark.sql.catalyst.expressions.BoundReference",
"org.apache.spark.sql.catalyst.expressions.SortOrder",
"[Lorg.apache.spark.sql.catalyst.expressions.SortOrder;",
Expand Down
73 changes: 5 additions & 68 deletions core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,13 @@ private[spark] object HadoopFSUtils extends Logging {
HiveCatalogMetrics.incrementParallelListingJobCount(1)

val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val serializedPaths = paths.map(_.toString)

// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
val numParallelism = Math.min(paths.size, parallelismMax)

val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
val statusMap = try {
try {
val description = paths.size match {
case 0 =>
"Listing leaf files and directories 0 paths"
Expand All @@ -119,11 +118,10 @@ private[spark] object HadoopFSUtils extends Logging {
s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."
}
sc.setJobDescription(description)
sc
.parallelize(serializedPaths, numParallelism)
.mapPartitions { pathStrings =>
sc.parallelize(paths, numParallelism)
.mapPartitions { pathsEachPartition =>
val hadoopConf = serializableConfiguration.value
pathStrings.map(new Path(_)).toSeq.map { path =>
pathsEachPartition.map { path =>
val leafFiles = listLeafFiles(
path = path,
hadoopConf = hadoopConf,
Expand All @@ -135,54 +133,11 @@ private[spark] object HadoopFSUtils extends Logging {
parallelismThreshold = Int.MaxValue,
parallelismMax = 0)
(path, leafFiles)
}.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
val blockLocations = status match {
case f: LocatedFileStatus =>
f.getBlockLocations.map { loc =>
SerializableBlockLocation(
loc.getNames,
loc.getHosts,
loc.getOffset,
loc.getLength)
}

case _ =>
Array.empty[SerializableBlockLocation]
}

SerializableFileStatus(
status.getPath.toString,
status.getLen,
status.isDirectory,
status.getReplication,
status.getBlockSize,
status.getModificationTime,
status.getAccessTime,
blockLocations)
}
(path.toString, serializableStatuses)
}
}.collect()
} finally {
sc.setJobDescription(previousJobDescription)
}

// turn SerializableFileStatus back to Status
statusMap.map { case (path, serializableStatuses) =>
val statuses = serializableStatuses.map { f =>
val blockLocations = f.blockLocations.map { loc =>
new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
}
new LocatedFileStatus(
new FileStatus(
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
new Path(f.path)),
blockLocations)
}
(new Path(path), statuses)
}
}

// scalastyle:off argcount
Expand Down Expand Up @@ -340,24 +295,6 @@ private[spark] object HadoopFSUtils extends Logging {
}
// scalastyle:on argcount

/** A serializable variant of HDFS's BlockLocation. This is required by Hadoop 2.7. */
private case class SerializableBlockLocation(
names: Array[String],
hosts: Array[String],
offset: Long,
length: Long)

/** A serializable variant of HDFS's FileStatus. This is required by Hadoop 2.7. */
private case class SerializableFileStatus(
path: String,
length: Long,
isDir: Boolean,
blockReplication: Short,
blockSize: Long,
modificationTime: Long,
accessTime: Long,
blockLocations: Array[SerializableBlockLocation])

/** Checks if we should filter out this path name. */
def shouldFilterOutPathName(pathName: String): Boolean = {
// We filter follow paths:
Expand Down

0 comments on commit 29b40f6

Please sign in to comment.