Skip to content

Commit 19ca63f

Browse files
heyihongcloud-fan
authored andcommitted
[SPARK-53553][CONNECT] Fix handling of null values in LiteralValueProtoConverter
### What changes were proposed in this pull request? This PR fixes the handling of null literal values in `LiteralValueProtoConverter` for Spark Connect. The main changes include: 1. **Added proper null value handling**: Created a new `setNullValue` method that correctly sets null values in proto literals with appropriate data type information. 2. **Reordered pattern matching**: Moved null and Option handling to the top of the pattern matching in `toLiteralProtoBuilderInternal` to ensure null values are processed before other type-specific logic. 3. **Fixed converter logic**: Updated the `getScalaConverter` method to properly handle null values by checking `hasNull` before applying type-specific conversion logic. ### Why are the changes needed? The previous implementation had several issues with null value handling: 1. **Incorrect null processing order**: Null values were being processed after type-specific logic, which could lead to exceptions. 2. **Missing null checks in converters**: The converter functions didn't properly check for null values before applying type-specific conversion logic. ### Does this PR introduce _any_ user-facing change? **Yes**. This PR fixes a bug where null values in literals (especially in arrays and maps) were not being properly handled in Spark Connect. Users who were experiencing issues with null value serialization in complex types should now see correct behavior. ### How was this patch tested? `build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite -- -z SPARK-53553"` `build/sbt "connect/testOnly *LiteralExpressionProtoConverterSuite"` `build/sbt "connect-client-jvm/testOnly org.apache.spark.sql.PlanGenerationTestSuite"` `build/sbt "connect/testOnly org.apache.spark.sql.connect.ProtoToParsedPlanTestSuite"` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor 1.5.11 Closes #52310 from heyihong/SPARK-53553. Authored-by: Yihong He <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent bacd343 commit 19ca63f

File tree

7 files changed

+287
-9
lines changed

7 files changed

+287
-9
lines changed

sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3419,8 +3419,12 @@ class PlanGenerationTestSuite
34193419
// Handle parameterized scala types e.g.: List, Seq and Map.
34203420
fn.typedLit(Some(1)),
34213421
fn.typedLit(Array(1, 2, 3)),
3422+
fn.typedLit[Array[Integer]](Array(null, null)),
3423+
fn.typedLit[Array[(Int, String)]](Array(null, null, (1, "a"), (2, null))),
3424+
fn.typedLit[Array[Option[(Int, String)]]](Array(None, None, Some((1, "a")))),
34223425
fn.typedLit(Seq(1, 2, 3)),
34233426
fn.typedLit(mutable.LinkedHashMap("a" -> 1, "b" -> 2)),
3427+
fn.typedLit(mutable.LinkedHashMap[String, Integer]("a" -> null, "b" -> null)),
34243428
fn.typedLit(("a", 2, 1.0)),
34253429
fn.typedLit[Option[Int]](None),
34263430
fn.typedLit[Array[Option[Int]]](Array(Some(1))),

sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1785,6 +1785,13 @@ class ClientE2ETestSuite
17851785
assert(observation.get.contains("map"))
17861786
assert(observation.get("map") === Map("count" -> 10))
17871787
}
1788+
1789+
test("SPARK-53553: null value handling in literals") {
1790+
val df = spark.sql("select 1").select(typedlit(Array[Integer](1, null)).as("arr_col"))
1791+
val result = df.collect()
1792+
assert(result.length === 1)
1793+
assert(result(0).getAs[Array[Integer]]("arr_col") === Array(1, null))
1794+
}
17881795
}
17891796

17901797
private[sql] case class ClassData(a: String, b: Int)

sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/LiteralValueProtoConverter.scala

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,19 @@ import org.apache.spark.unsafe.types.CalendarInterval
4040

4141
object LiteralValueProtoConverter {
4242

43+
private def setNullValue(
44+
builder: proto.Expression.Literal.Builder,
45+
dataType: DataType,
46+
needDataType: Boolean): proto.Expression.Literal.Builder = {
47+
if (needDataType) {
48+
builder.setNull(toConnectProtoType(dataType))
49+
} else {
50+
// No need data type but still set the null type to indicate that
51+
// the value is null.
52+
builder.setNull(ProtoDataTypes.NullType)
53+
}
54+
}
55+
4356
private def setArrayTypeAfterAddingElements(
4457
ab: proto.Expression.Literal.Array.Builder,
4558
elementType: DataType,
@@ -275,6 +288,14 @@ object LiteralValueProtoConverter {
275288
}
276289

277290
(literal, dataType) match {
291+
case (v: Option[_], _) =>
292+
if (v.isDefined) {
293+
toLiteralProtoBuilderInternal(v.get, dataType, options, needDataType)
294+
} else {
295+
setNullValue(builder, dataType, needDataType)
296+
}
297+
case (null, _) =>
298+
setNullValue(builder, dataType, needDataType)
278299
case (v: mutable.ArraySeq[_], ArrayType(_, _)) =>
279300
toLiteralProtoBuilderInternal(v.array, dataType, options, needDataType)
280301
case (v: immutable.ArraySeq[_], ArrayType(_, _)) =>
@@ -287,12 +308,6 @@ object LiteralValueProtoConverter {
287308
builder.setMap(mapBuilder(v, keyType, valueType, valueContainsNull))
288309
case (v, structType: StructType) =>
289310
builder.setStruct(structBuilder(v, structType))
290-
case (v: Option[_], _: DataType) =>
291-
if (v.isDefined) {
292-
toLiteralProtoBuilderInternal(v.get, options, needDataType)
293-
} else {
294-
builder.setNull(toConnectProtoType(dataType))
295-
}
296311
case (v: LocalTime, timeType: TimeType) =>
297312
builder.setTime(
298313
builder.getTimeBuilder
@@ -477,7 +492,7 @@ object LiteralValueProtoConverter {
477492
}
478493

479494
private def getScalaConverter(dataType: proto.DataType): proto.Expression.Literal => Any = {
480-
dataType.getKindCase match {
495+
val converter: proto.Expression.Literal => Any = dataType.getKindCase match {
481496
case proto.DataType.KindCase.SHORT => v => v.getShort.toShort
482497
case proto.DataType.KindCase.INTEGER => v => v.getInteger
483498
case proto.DataType.KindCase.LONG => v => v.getLong
@@ -513,6 +528,7 @@ object LiteralValueProtoConverter {
513528
case _ =>
514529
throw InvalidPlanInput(s"Unsupported Literal Type: ${dataType.getKindCase}")
515530
}
531+
v => if (v.hasNull) null else converter(v)
516532
}
517533

518534
private def getInferredDataType(
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [id#0L, id#0L, 1 AS 1#0, null AS NULL#0, true AS true#0, 68 AS 68#0, 9872 AS 9872#0, -8726532 AS -8726532#0, 7834609328726532 AS 7834609328726532#0L, 2.718281828459045 AS 2.718281828459045#0, -0.8 AS -0.8#0, 89.97620 AS 89.97620#0, 89889.7667231 AS 89889.7667231#0, connect! AS connect!#0, T AS T#0, ABCDEFGHIJ AS ABCDEFGHIJ#0, 0x78797A7B7C7D7E7F808182838485868788898A8B8C8D8E AS X'78797A7B7C7D7E7F808182838485868788898A8B8C8D8E'#0, 0x0806 AS X'0806'#0, [8,6] AS ARRAY(8, 6)#0, null AS NULL#0, 2020-10-10 AS DATE '2020-10-10'#0, 8.997620 AS 8.997620#0, 2023-02-23 04:31:59.808 AS TIMESTAMP '2023-02-23 04:31:59.808'#0, 1969-12-31 16:00:12.345 AS TIMESTAMP '1969-12-31 16:00:12.345'#0, 2023-02-23 20:36:00 AS TIMESTAMP_NTZ '2023-02-23 20:36:00'#0, 2023-02-23 AS DATE '2023-02-23'#0, INTERVAL '0 00:03:20' DAY TO SECOND AS INTERVAL '0 00:03:20' DAY TO SECOND#0, INTERVAL '0-0' YEAR TO MONTH AS INTERVAL '0-0' YEAR TO MONTH#0, 23:59:59.999999999 AS TIME '23:59:59.999999999'#0, 2 months 20 days 0.0001 seconds AS INTERVAL '2 months 20 days 0.0001 seconds'#0, [18545,1677155519808000,12345000,1677184560000000,19411,200000000,0,86399999999999,2 months 20 days 0.0001 seconds] AS NAMED_STRUCT('_1', DATE '2020-10-10', '_2', TIMESTAMP '2023-02-23 04:31:59.808', '_3', TIMESTAMP '1969-12-31 16:00:12.345', '_4', TIMESTAMP_NTZ '2023-02-23 20:36:00', '_5', DATE '2023-02-23', '_6', INTERVAL '0 00:03:20' DAY TO SECOND, '_7', INTERVAL '0-0' YEAR TO MONTH, '_8', TIME '23:59:59.999999999', '_9', INTERVAL '2 months 20 days 0.0001 seconds')#0, 1 AS 1#0, [1,2,3] AS ARRAY(1, 2, 3)#0, [1,2,3] AS ARRAY(1, 2, 3)#0, map(keys: [a,b], values: [1,2]) AS MAP('a', 1, 'b', 2)#0, [a,2,1.0] AS NAMED_STRUCT('_1', 'a', '_2', 2, '_3', 1.0D)#0, null AS NULL#0, [1] AS ARRAY(1)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, [[1,2,3],[4,5,6],[7,8,9]] AS ARRAY(ARRAY(1, 2, 3), ARRAY(4, 5, 6), ARRAY(7, 8, 9))#0, [keys: [a,b], values: [1,2],keys: [a,b], values: [3,4],keys: [a,b], values: [5,6]] AS ARRAY(MAP('a', 1, 'b', 2), MAP('a', 3, 'b', 4), MAP('a', 5, 'b', 6))#0, [keys: [a,b], values: [[1,2],[3,4]],keys: [a,b], values: [[5,6],[7,8]],keys: [a,b], values: [[],[]]] AS ARRAY(MAP('a', ARRAY('1', '2'), 'b', ARRAY('3', '4')), MAP('a', ARRAY('5', '6'), 'b', ARRAY('7', '8')), MAP('a', ARRAY(), 'b', ARRAY()))#0, map(keys: [1,2], values: [keys: [a,b], values: [1,2],keys: [a,b], values: [3,4]]) AS MAP(1, MAP('a', 1, 'b', 2), 2, MAP('a', 3, 'b', 4))#0, [[1,2,3],keys: [a,b], values: [1,2],[a,keys: [1,2], values: [a,b]]] AS NAMED_STRUCT('_1', ARRAY(1, 2, 3), '_2', MAP('a', 1, 'b', 2), '_3', NAMED_STRUCT('_1', 'a', '_2', MAP(1, 'a', 2, 'b')))#0]
1+
Project [id#0L, id#0L, 1 AS 1#0, null AS NULL#0, true AS true#0, 68 AS 68#0, 9872 AS 9872#0, -8726532 AS -8726532#0, 7834609328726532 AS 7834609328726532#0L, 2.718281828459045 AS 2.718281828459045#0, -0.8 AS -0.8#0, 89.97620 AS 89.97620#0, 89889.7667231 AS 89889.7667231#0, connect! AS connect!#0, T AS T#0, ABCDEFGHIJ AS ABCDEFGHIJ#0, 0x78797A7B7C7D7E7F808182838485868788898A8B8C8D8E AS X'78797A7B7C7D7E7F808182838485868788898A8B8C8D8E'#0, 0x0806 AS X'0806'#0, [8,6] AS ARRAY(8, 6)#0, null AS NULL#0, 2020-10-10 AS DATE '2020-10-10'#0, 8.997620 AS 8.997620#0, 2023-02-23 04:31:59.808 AS TIMESTAMP '2023-02-23 04:31:59.808'#0, 1969-12-31 16:00:12.345 AS TIMESTAMP '1969-12-31 16:00:12.345'#0, 2023-02-23 20:36:00 AS TIMESTAMP_NTZ '2023-02-23 20:36:00'#0, 2023-02-23 AS DATE '2023-02-23'#0, INTERVAL '0 00:03:20' DAY TO SECOND AS INTERVAL '0 00:03:20' DAY TO SECOND#0, INTERVAL '0-0' YEAR TO MONTH AS INTERVAL '0-0' YEAR TO MONTH#0, 23:59:59.999999999 AS TIME '23:59:59.999999999'#0, 2 months 20 days 0.0001 seconds AS INTERVAL '2 months 20 days 0.0001 seconds'#0, [18545,1677155519808000,12345000,1677184560000000,19411,200000000,0,86399999999999,2 months 20 days 0.0001 seconds] AS NAMED_STRUCT('_1', DATE '2020-10-10', '_2', TIMESTAMP '2023-02-23 04:31:59.808', '_3', TIMESTAMP '1969-12-31 16:00:12.345', '_4', TIMESTAMP_NTZ '2023-02-23 20:36:00', '_5', DATE '2023-02-23', '_6', INTERVAL '0 00:03:20' DAY TO SECOND, '_7', INTERVAL '0-0' YEAR TO MONTH, '_8', TIME '23:59:59.999999999', '_9', INTERVAL '2 months 20 days 0.0001 seconds')#0, 1 AS 1#0, [1,2,3] AS ARRAY(1, 2, 3)#0, [null,null] AS ARRAY(CAST(NULL AS INT), CAST(NULL AS INT))#0, [null,null,[1,a],[2,null]] AS ARRAY(NULL, NULL, NAMED_STRUCT('_1', 1, '_2', 'a'), NAMED_STRUCT('_1', 2, '_2', CAST(NULL AS STRING)))#0, [null,null,[1,a]] AS ARRAY(NULL, NULL, NAMED_STRUCT('_1', 1, '_2', 'a'))#0, [1,2,3] AS ARRAY(1, 2, 3)#0, map(keys: [a,b], values: [1,2]) AS MAP('a', 1, 'b', 2)#0, map(keys: [a,b], values: [null,null]) AS MAP('a', CAST(NULL AS INT), 'b', CAST(NULL AS INT))#0, [a,2,1.0] AS NAMED_STRUCT('_1', 'a', '_2', 2, '_3', 1.0D)#0, null AS NULL#0, [1] AS ARRAY(1)#0, map(keys: [1], values: [null]) AS MAP(1, CAST(NULL AS INT))#0, map(keys: [1], values: [null]) AS MAP(1, CAST(NULL AS INT))#0, map(keys: [1], values: [null]) AS MAP(1, CAST(NULL AS INT))#0, [[1,2,3],[4,5,6],[7,8,9]] AS ARRAY(ARRAY(1, 2, 3), ARRAY(4, 5, 6), ARRAY(7, 8, 9))#0, [keys: [a,b], values: [1,2],keys: [a,b], values: [3,4],keys: [a,b], values: [5,6]] AS ARRAY(MAP('a', 1, 'b', 2), MAP('a', 3, 'b', 4), MAP('a', 5, 'b', 6))#0, [keys: [a,b], values: [[1,2],[3,4]],keys: [a,b], values: [[5,6],[7,8]],keys: [a,b], values: [[],[]]] AS ARRAY(MAP('a', ARRAY('1', '2'), 'b', ARRAY('3', '4')), MAP('a', ARRAY('5', '6'), 'b', ARRAY('7', '8')), MAP('a', ARRAY(), 'b', ARRAY()))#0, map(keys: [1,2], values: [keys: [a,b], values: [1,2],keys: [a,b], values: [3,4]]) AS MAP(1, MAP('a', 1, 'b', 2), 2, MAP('a', 3, 'b', 4))#0, [[1,2,3],keys: [a,b], values: [1,2],[a,keys: [1,2], values: [a,b]]] AS NAMED_STRUCT('_1', ARRAY(1, 2, 3), '_2', MAP('a', 1, 'b', 2), '_3', NAMED_STRUCT('_1', 'a', '_2', MAP(1, 'a', 2, 'b')))#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0]

sql/connect/common/src/test/resources/query-tests/queries/function_typedLit.json

Lines changed: 248 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@
7777
}, {
7878
"literal": {
7979
"null": {
80-
"null": {
80+
"string": {
81+
"collation": "UTF8_BINARY"
8182
}
8283
}
8384
},
@@ -821,6 +822,206 @@
821822
}
822823
}
823824
}
825+
}, {
826+
"literal": {
827+
"array": {
828+
"elements": [{
829+
"null": {
830+
"integer": {
831+
}
832+
}
833+
}, {
834+
"null": {
835+
"null": {
836+
}
837+
}
838+
}],
839+
"dataType": {
840+
"containsNull": true
841+
}
842+
}
843+
},
844+
"common": {
845+
"origin": {
846+
"jvmOrigin": {
847+
"stackTrace": [{
848+
"classLoaderName": "app",
849+
"declaringClass": "org.apache.spark.sql.functions$",
850+
"methodName": "typedLit",
851+
"fileName": "functions.scala"
852+
}, {
853+
"classLoaderName": "app",
854+
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
855+
"methodName": "~~trimmed~anonfun~~",
856+
"fileName": "PlanGenerationTestSuite.scala"
857+
}]
858+
}
859+
}
860+
}
861+
}, {
862+
"literal": {
863+
"array": {
864+
"elements": [{
865+
"null": {
866+
"struct": {
867+
"fields": [{
868+
"name": "_1",
869+
"dataType": {
870+
"integer": {
871+
}
872+
}
873+
}, {
874+
"name": "_2",
875+
"dataType": {
876+
"string": {
877+
"collation": "UTF8_BINARY"
878+
}
879+
},
880+
"nullable": true
881+
}]
882+
}
883+
}
884+
}, {
885+
"null": {
886+
"null": {
887+
}
888+
}
889+
}, {
890+
"struct": {
891+
"elements": [{
892+
"integer": 1
893+
}, {
894+
"string": "a"
895+
}],
896+
"dataTypeStruct": {
897+
"fields": [{
898+
"name": "_1"
899+
}, {
900+
"name": "_2",
901+
"dataType": {
902+
"string": {
903+
"collation": "UTF8_BINARY"
904+
}
905+
},
906+
"nullable": true
907+
}]
908+
}
909+
}
910+
}, {
911+
"struct": {
912+
"elements": [{
913+
"integer": 2
914+
}, {
915+
"null": {
916+
"string": {
917+
"collation": "UTF8_BINARY"
918+
}
919+
}
920+
}],
921+
"dataTypeStruct": {
922+
"fields": [{
923+
"name": "_1"
924+
}, {
925+
"name": "_2",
926+
"nullable": true
927+
}]
928+
}
929+
}
930+
}],
931+
"dataType": {
932+
"containsNull": true
933+
}
934+
}
935+
},
936+
"common": {
937+
"origin": {
938+
"jvmOrigin": {
939+
"stackTrace": [{
940+
"classLoaderName": "app",
941+
"declaringClass": "org.apache.spark.sql.functions$",
942+
"methodName": "typedLit",
943+
"fileName": "functions.scala"
944+
}, {
945+
"classLoaderName": "app",
946+
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
947+
"methodName": "~~trimmed~anonfun~~",
948+
"fileName": "PlanGenerationTestSuite.scala"
949+
}]
950+
}
951+
}
952+
}
953+
}, {
954+
"literal": {
955+
"array": {
956+
"elements": [{
957+
"null": {
958+
"struct": {
959+
"fields": [{
960+
"name": "_1",
961+
"dataType": {
962+
"integer": {
963+
}
964+
}
965+
}, {
966+
"name": "_2",
967+
"dataType": {
968+
"string": {
969+
"collation": "UTF8_BINARY"
970+
}
971+
},
972+
"nullable": true
973+
}]
974+
}
975+
}
976+
}, {
977+
"null": {
978+
"null": {
979+
}
980+
}
981+
}, {
982+
"struct": {
983+
"elements": [{
984+
"integer": 1
985+
}, {
986+
"string": "a"
987+
}],
988+
"dataTypeStruct": {
989+
"fields": [{
990+
"name": "_1"
991+
}, {
992+
"name": "_2",
993+
"dataType": {
994+
"string": {
995+
"collation": "UTF8_BINARY"
996+
}
997+
},
998+
"nullable": true
999+
}]
1000+
}
1001+
}
1002+
}],
1003+
"dataType": {
1004+
"containsNull": true
1005+
}
1006+
}
1007+
},
1008+
"common": {
1009+
"origin": {
1010+
"jvmOrigin": {
1011+
"stackTrace": [{
1012+
"classLoaderName": "app",
1013+
"declaringClass": "org.apache.spark.sql.functions$",
1014+
"methodName": "typedLit",
1015+
"fileName": "functions.scala"
1016+
}, {
1017+
"classLoaderName": "app",
1018+
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
1019+
"methodName": "~~trimmed~anonfun~~",
1020+
"fileName": "PlanGenerationTestSuite.scala"
1021+
}]
1022+
}
1023+
}
1024+
}
8241025
}, {
8251026
"literal": {
8261027
"array": {
@@ -891,6 +1092,52 @@
8911092
}
8921093
}
8931094
}
1095+
}, {
1096+
"literal": {
1097+
"map": {
1098+
"keys": [{
1099+
"string": "a"
1100+
}, {
1101+
"string": "b"
1102+
}],
1103+
"values": [{
1104+
"null": {
1105+
"integer": {
1106+
}
1107+
}
1108+
}, {
1109+
"null": {
1110+
"null": {
1111+
}
1112+
}
1113+
}],
1114+
"dataType": {
1115+
"keyType": {
1116+
"string": {
1117+
"collation": "UTF8_BINARY"
1118+
}
1119+
},
1120+
"valueContainsNull": true
1121+
}
1122+
}
1123+
},
1124+
"common": {
1125+
"origin": {
1126+
"jvmOrigin": {
1127+
"stackTrace": [{
1128+
"classLoaderName": "app",
1129+
"declaringClass": "org.apache.spark.sql.functions$",
1130+
"methodName": "typedLit",
1131+
"fileName": "functions.scala"
1132+
}, {
1133+
"classLoaderName": "app",
1134+
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
1135+
"methodName": "~~trimmed~anonfun~~",
1136+
"fileName": "PlanGenerationTestSuite.scala"
1137+
}]
1138+
}
1139+
}
1140+
}
8941141
}, {
8951142
"literal": {
8961143
"struct": {
1.05 KB
Binary file not shown.

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/LiteralExpressionProtoConverterSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,11 @@ class LiteralExpressionProtoConverterSuite extends AnyFunSuite { // scalastyle:i
5353
}
5454
}
5555

56+
// The goal of this test is to check that converting a Scala value -> Proto -> Catalyst value
57+
// is equivalent to converting a Scala value directly to a Catalyst value.
5658
Seq[(Any, DataType)](
59+
(Array[String](null, "a", null), ArrayType(StringType)),
60+
(Map[String, String]("a" -> null, "b" -> null), MapType(StringType, StringType)),
5761
(
5862
(1, "string", true),
5963
StructType(

0 commit comments

Comments
 (0)