Skip to content

Commit c971f5f

Browse files
committed
implement partitioned
1 parent 5eedbf1 commit c971f5f

File tree

2 files changed

+61
-8
lines changed

2 files changed

+61
-8
lines changed

python/sedona/utils/adapter.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,3 +210,20 @@ def toDf(
210210
return Adapter.toDf(srdd, fieldNames, spark)
211211
else:
212212
return Adapter.toDf(srdd, spark)
213+
214+
@classmethod
215+
def toDfPartitioned(cls, spatialRDD: SpatialRDD, sparkSession: SparkSession) -> DataFrame:
216+
"""
217+
218+
:param spatialRDD:
219+
:param sparkSession:
220+
:return:
221+
"""
222+
sc = spatialRDD._sc
223+
jvm = sc._jvm
224+
225+
jdf = jvm.Adapter.toDfPartitioned(spatialRDD._srdd, sparkSession._jsparkSession)
226+
227+
df = Adapter._create_dataframe(jdf, sparkSession)
228+
229+
return df

spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,10 @@ object Adapter {
162162
spatialRDD: SpatialRDD[T],
163163
schema: StructType,
164164
sparkSession: SparkSession): DataFrame = {
165-
val rdd = spatialRDD.rawSpatialRDD.rdd.mapPartitions(
166-
iter => {
167-
iter.map[Row](geom => {
168-
val stringRow = extractUserData(geom)
169-
castRowToSchema(stringRow = stringRow, schema = schema)
170-
})
171-
},
172-
true)
165+
val rdd = spatialRDD.rawSpatialRDD.rdd.map[Row](geom => {
166+
val stringRow = extractUserData(geom)
167+
castRowToSchema(stringRow = stringRow, schema = schema)
168+
})
173169

174170
sparkSession.sqlContext.createDataFrame(rdd, schema)
175171
}
@@ -239,6 +235,46 @@ object Adapter {
239235
sparkSession.sqlContext.createDataFrame(rdd, schema)
240236
}
241237

238+
/**
239+
* Convert a spatial RDD to DataFrame with a given schema keeping spatial partitioning
240+
*
241+
* Note that spatial partitioning methods that introduce duplicates will result in an output
242+
* data frame with duplicate features. This property is essential for implementing correct
243+
* joins; however, may introduce surprising results.
244+
*
245+
* @param spatialRDD
246+
* Spatial RDD
247+
* @param fieldNames
248+
* Desired field names
249+
* @param sparkSession
250+
* Spark Session
251+
* @tparam T
252+
* Geometry
253+
* @return
254+
* DataFrame with the specified field names with spatial partitioning preserved
255+
*/
256+
def toDfPartitioned[T <: Geometry](
257+
spatialRDD: SpatialRDD[T],
258+
fieldNames: Seq[String],
259+
sparkSession: SparkSession): DataFrame = {
260+
val rowRdd = spatialRDD.spatialPartitionedRDD.map[Row](geom => {
261+
val stringRow = extractUserData(geom)
262+
Row.fromSeq(stringRow)
263+
})
264+
var cols: Seq[StructField] = Seq(StructField("geometry", GeometryUDT))
265+
if (fieldNames != null && fieldNames.nonEmpty) {
266+
cols = cols ++ fieldNames.map(f => StructField(f, StringType))
267+
}
268+
val schema = StructType(cols)
269+
sparkSession.createDataFrame(rowRdd, schema)
270+
}
271+
272+
def toDfPartitioned[T <: Geometry](
273+
spatialRDD: SpatialRDD[T],
274+
sparkSession: SparkSession): DataFrame = {
275+
toDfPartitioned(spatialRDD, null, sparkSession)
276+
}
277+
242278
/**
243279
* Extract user data from a geometry.
244280
*

0 commit comments

Comments
 (0)