From 29b40f6fd0c42e594e24b12c8b828913139e8f54 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 19 Apr 2023 21:02:46 -0700 Subject: [PATCH] [SPARK-43195][CORE] Remove unnecessary serializable wrapper in HadoopFSUtils ### What changes were proposed in this pull request? Remove unnecessary serializable wrapper in `HadoopFSUtils`. ### Why are the changes needed? `Path`, `FileStatus` become serializable in Hadoop3, since SPARK-42452 removed support for Hadoop2, we can remove those wrapper now. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass CI. Closes #40854 from pan3793/SPARK-43195. Authored-by: Cheng Pan Signed-off-by: Chao Sun --- .../spark/serializer/KryoSerializer.scala | 4 - .../org/apache/spark/util/HadoopFSUtils.scala | 73 ++----------------- 2 files changed, 5 insertions(+), 72 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index d79f6453bc5fb..1736088b4983c 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -510,10 +510,6 @@ private[serializer] object KryoSerializer { // SQL / ML / MLlib classes once and then re-use that filtered list in newInstance() calls. private lazy val loadableSparkClasses: Seq[Class[_]] = { Seq( - "org.apache.spark.util.HadoopFSUtils$SerializableBlockLocation", - "[Lorg.apache.spark.util.HadoopFSUtils$SerializableBlockLocation;", - "org.apache.spark.util.HadoopFSUtils$SerializableFileStatus", - "org.apache.spark.sql.catalyst.expressions.BoundReference", "org.apache.spark.sql.catalyst.expressions.SortOrder", "[Lorg.apache.spark.sql.catalyst.expressions.SortOrder;", diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 01dc3ba68cc63..bb03cac8f1f62 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -102,14 +102,13 @@ private[spark] object HadoopFSUtils extends Logging { HiveCatalogMetrics.incrementParallelListingJobCount(1) val serializableConfiguration = new SerializableConfiguration(hadoopConf) - val serializedPaths = paths.map(_.toString) // Set the number of parallelism to prevent following file listing from generating many tasks // in case of large #defaultParallelism. val numParallelism = Math.min(paths.size, parallelismMax) val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) - val statusMap = try { + try { val description = paths.size match { case 0 => "Listing leaf files and directories 0 paths" @@ -119,11 +118,10 @@ private[spark] object HadoopFSUtils extends Logging { s"Listing leaf files and directories for $s paths:
${paths(0)}, ..." } sc.setJobDescription(description) - sc - .parallelize(serializedPaths, numParallelism) - .mapPartitions { pathStrings => + sc.parallelize(paths, numParallelism) + .mapPartitions { pathsEachPartition => val hadoopConf = serializableConfiguration.value - pathStrings.map(new Path(_)).toSeq.map { path => + pathsEachPartition.map { path => val leafFiles = listLeafFiles( path = path, hadoopConf = hadoopConf, @@ -135,54 +133,11 @@ private[spark] object HadoopFSUtils extends Logging { parallelismThreshold = Int.MaxValue, parallelismMax = 0) (path, leafFiles) - }.iterator - }.map { case (path, statuses) => - val serializableStatuses = statuses.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) - } - (path.toString, serializableStatuses) + } }.collect() } finally { sc.setJobDescription(previousJobDescription) } - - // turn SerializableFileStatus back to Status - statusMap.map { case (path, serializableStatuses) => - val statuses = serializableStatuses.map { f => - val blockLocations = f.blockLocations.map { loc => - new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) - } - new LocatedFileStatus( - new FileStatus( - f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, - new Path(f.path)), - blockLocations) - } - (new Path(path), statuses) - } } // scalastyle:off argcount @@ -340,24 +295,6 @@ private[spark] object HadoopFSUtils extends Logging { } // scalastyle:on argcount - /** A serializable variant of HDFS's BlockLocation. This is required by Hadoop 2.7. */ - private case class SerializableBlockLocation( - names: Array[String], - hosts: Array[String], - offset: Long, - length: Long) - - /** A serializable variant of HDFS's FileStatus. This is required by Hadoop 2.7. */ - private case class SerializableFileStatus( - path: String, - length: Long, - isDir: Boolean, - blockReplication: Short, - blockSize: Long, - modificationTime: Long, - accessTime: Long, - blockLocations: Array[SerializableBlockLocation]) - /** Checks if we should filter out this path name. */ def shouldFilterOutPathName(pathName: String): Boolean = { // We filter follow paths: