Skip to content

Commit ab7cfad

Browse files
[3.3] Make timestamp stats formatter UDF serializable
1 parent bd8de10 commit ab7cfad

File tree

1 file changed

+39
-31
lines changed

1 file changed

+39
-31
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/stats/StatisticsCollection.scala

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, AstBuilder, Pars
4646
import org.apache.spark.sql.catalyst.parser.SqlBaseParser.MultipartIdentifierListContext
4747
import org.apache.spark.sql.functions._
4848
import org.apache.spark.sql.functions.lit
49-
import org.apache.spark.sql.internal.SQLConf
49+
import org.apache.spark.sql.expressions.UserDefinedFunction
5050
import org.apache.spark.sql.types._
5151

5252
/**
@@ -150,36 +150,6 @@ trait StatisticsCollection extends DeltaLogging {
150150
lazy val statCollectionLogicalSchema: StructType =
151151
getIndexedColumns(explodedDataSchemaNames, statsColumnSpec, effectiveSchema, NoMapping)
152152

153-
/**
154-
* Formatter used to serialize timestamp MIN/MAX statistics to JSON. Historically, Spark 3.3.2
155-
* truncated historical timezone offsets down to minute precision when rendering timestamps,
156-
* which could make the serialized stats differ from the true values by up to 59 seconds (see
157-
* Delta issue 5249). To avoid that, we mirror Delta 4.0 and explicitly format timestamp stats
158-
* using a pattern that preserves offset seconds: `yyyy-MM-dd'T'HH:mm:ss.SSSXXXXX`.
159-
*/
160-
private lazy val timestampStatsFormatterUdf = {
161-
val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
162-
val timeZone =
163-
org.apache.spark.sql.delta.util.DateTimeUtils.getTimeZone(timeZoneId)
164-
val formatter =
165-
org.apache.spark.sql.delta.util.TimestampFormatter(
166-
"yyyy-MM-dd'T'HH:mm:ss.SSSXXXXX",
167-
timeZone)
168-
val formatTimestamp =
169-
(ts: java.sql.Timestamp) => {
170-
if (ts == null) {
171-
null
172-
} else {
173-
val micros =
174-
org.apache.spark.sql.delta.util.DateTimeUtils.fromJavaTimestamp(ts)
175-
org.apache.spark.sql.delta.util.DateTimeUtils.timestampToString(
176-
formatter,
177-
micros)
178-
}
179-
}
180-
udf(formatTimestamp)
181-
}
182-
183153
/**
184154
* Traverses the [[statisticsSchema]] for the provided [[statisticsColumn]]
185155
* and applies [[function]] to leaves.
@@ -273,6 +243,15 @@ trait StatisticsCollection extends DeltaLogging {
273243
val stringPrefix =
274244
spark.sessionState.conf.getConf(DeltaSQLConf.DATA_SKIPPING_STRING_PREFIX_LENGTH)
275245

246+
// Formatter used to serialize timestamp MIN/MAX statistics to JSON. Historically, Spark 3.3.2
247+
// truncated historical timezone offsets down to minute precision when rendering timestamps,
248+
// which could make the serialized stats differ from the true values by up to 59 seconds (see
249+
// Delta issue 5249). To avoid that, we mirror Delta 4.0 and explicitly format timestamp stats
250+
// using a pattern that preserves offset seconds: `yyyy-MM-dd'T'HH:mm:ss.SSSXXXXX`.
251+
val sessionTimeZoneId = spark.sessionState.conf.sessionLocalTimeZone
252+
val timestampStatsFormatterUdf =
253+
StatisticsCollection.timestampStatsFormatterUdf(sessionTimeZoneId)
254+
276255
// On file initialization/stat recomputation TIGHT_BOUNDS is always set to true
277256
val tightBoundsColOpt =
278257
Option.when(deletionVectorsSupported &&
@@ -447,6 +426,35 @@ object StatisticsCollection extends DeltaCommand {
447426

448427
val UTF8_MAX_CHARACTER = new String(Character.toChars(Character.MAX_CODE_POINT))
449428

429+
/**
430+
* Builds a UDF for formatting timestamp statistics using a pattern that preserves offset seconds:
431+
* `yyyy-MM-dd'T'HH:mm:ss.SSSXXXXX`. This mirrors Delta 4.0 behavior so that new tables write
432+
* precise timestamp stats, while older tables with truncated offsets are handled by the
433+
* reader-side widening logic in `DataSkippingReader`.
434+
*/
435+
private[delta] def timestampStatsFormatterUdf(
436+
sessionTimeZoneId: String): UserDefinedFunction = {
437+
val timeZone =
438+
org.apache.spark.sql.delta.util.DateTimeUtils.getTimeZone(sessionTimeZoneId)
439+
val formatter =
440+
org.apache.spark.sql.delta.util.TimestampFormatter(
441+
"yyyy-MM-dd'T'HH:mm:ss.SSSXXXXX",
442+
timeZone)
443+
val formatTimestamp =
444+
(ts: java.sql.Timestamp) => {
445+
if (ts == null) {
446+
null
447+
} else {
448+
val micros =
449+
org.apache.spark.sql.delta.util.DateTimeUtils.fromJavaTimestamp(ts)
450+
org.apache.spark.sql.delta.util.DateTimeUtils.timestampToString(
451+
formatter,
452+
micros)
453+
}
454+
}
455+
udf(formatTimestamp)
456+
}
457+
450458
/**
451459
* The SQL grammar already includes a `multipartIdentifierList` rule for parsing a string into a
452460
* list of multi-part identifiers. We just expose it here, with a custom parser and AstBuilder.

0 commit comments

Comments
 (0)