Skip to content

Commit ec7f2c1

Browse files
[3.3] Fix timestamp data-skipping stats for truncated timezone offsets
Signed-off-by: AnudeepKonaboina <[email protected]>
1 parent a7f222c commit ec7f2c1

File tree

4 files changed

+130
-5
lines changed

4 files changed

+130
-5
lines changed

PROTOCOL.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1874,7 +1874,7 @@ nullCount | The number of `null` values for this column | <p>If the `nullCount`
18741874
minValues | A value that is equal to the smallest valid value[^1] present in the file for this column. If all valid rows are null, this carries no information. | A value that is less than or equal to all valid values[^1] present in this file for this column. If all valid rows are null, this carries no information.
18751875
maxValues | A value that is equal to the largest valid value[^1] present in the file for this column. If all valid rows are null, this carries no information. | A value that is greater than or equal to all valid values[^1] present in this file for this column. If all valid rows are null, this carries no information.
18761876

1877-
[^1]: String columns are cut off at a fixed prefix length. Timestamp columns are truncated down to milliseconds.
1877+
[^1]: String columns are cut off at a fixed prefix length. Timestamp columns are truncated down to milliseconds. Implementations **must not** truncate timezone offsets in timestamp statistics to minute precision in a way that changes the represented instant. Modern writers SHOULD encode timestamp statistics as instants in UTC with microsecond precision (for example, using an ISO 8601 representation adjusted to UTC, such as `1970-01-01T00:00:00.123456Z`). Readers MUST treat statistics as approximate bounds and MAY widen the effective min/max range to avoid incorrectly skipping files when older writers produced truncated timestamp statistics.
18781878

18791879
## Partition Value Serialization
18801880

spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,21 @@ private[delta] object DataSkippingReader {
186186
val oneMillisecond = new CalendarInterval(0, 0, 1000 /* micros */)
187187
new Literal(oneMillisecond, CalendarIntervalType)
188188
}
189+
// SC-22824 widened the max timestamp stats range by +1 ms to account for JSON truncation down to
190+
// milliseconds. For tables written by Delta 3.3.2, timestamp stats in JSON may also be off by up
191+
// to 59 seconds due to historical timezone offsets being truncated to minute resolution (see
192+
// Delta issue 5249). To avoid incorrect data skipping (pruning files that may still contain
193+
// matching rows), we treat these stats as approximate bounds and widen the range by up to
194+
// 59 seconds on both sides for timestamp stats.
195+
private val fiftyNineSecondsMicros: Long = 59L * 1000000L
196+
val plusFiftyNineSecondsLiteralExpr: Literal = {
197+
val interval = new CalendarInterval(0, 0, fiftyNineSecondsMicros)
198+
new Literal(interval, CalendarIntervalType)
199+
}
200+
val minusFiftyNineSecondsLiteralExpr: Literal = {
201+
val interval = new CalendarInterval(0, 0, -fiftyNineSecondsMicros)
202+
new Literal(interval, CalendarIntervalType)
203+
}
189204

190205
lazy val sizeCollectorInputEncoders: Seq[Option[ExpressionEncoder[_]]] = Seq(
191206
Option(ExpressionEncoder[Boolean]()),
@@ -949,16 +964,38 @@ trait DataSkippingReaderBase
949964
// Filter out non-leaf columns -- they lack stats so skipping predicates can't use them.
950965
.filterNot(_._2.isInstanceOf[StructType])
951966
.map {
967+
case (statCol, TimestampType, _) if pathToStatType.head == MIN =>
968+
// Delta Spark 3.3.2 wrote timestamp stats to JSON using timezone offsets with seconds
969+
// truncated down to minute resolution (e.g. +005328 -> +0053). When these values are
970+
// parsed back as timestamps, they can be off by up to 59 seconds from the true value,
971+
// which may cause incorrect data skipping (files being pruned even though they might
972+
// contain matching rows).
973+
//
974+
// To avoid data loss for such tables, we treat the JSON stats for timestamps as
975+
// approximate and widen the range by up to 59 seconds on both sides. For the min
976+
// bound, subtract 59 seconds.
977+
Column(
978+
Cast(TimeAdd(statCol.expr, minusFiftyNineSecondsLiteralExpr), TimestampType))
952979
case (statCol, TimestampType, _) if pathToStatType.head == MAX =>
953980
// SC-22824: For timestamps, JSON serialization will truncate to milliseconds. This means
954981
// that we must adjust 1 millisecond upwards for max stats, or we will incorrectly skip
955982
// records that differ only in microsecond precision. (For example, a file containing only
956983
// 01:02:03.456789 will be written with min == max == 01:02:03.456, so we must consider it
957984
// to contain the range from 01:02:03.456 to 01:02:03.457.)
958985
//
959-
// There is a longer term task SC-22825 to fix the serialization problem that caused this.
960-
// But we need the adjustment in any case to correctly read stats written by old versions.
961-
Column(Cast(TimeAdd(statCol.expr, oneMillisecondLiteralExpr), TimestampType))
986+
// Delta Spark 3.3.2 also truncated historical timezone offsets to minute resolution in
987+
// JSON stats, which can make the recorded max timestamp up to 59 seconds earlier than
988+
// the true value. To avoid incorrectly skipping files for such tables, we further widen
989+
// the upper bound by 59 seconds.
990+
//
991+
// There is a longer term task SC-22825 to fix the serialization problem that caused the
992+
// millisecond truncation, and Delta issue 5249 tracks the timezone offset truncation.
993+
// We need this adjustment in any case to correctly read stats written by old versions.
994+
val widened =
995+
TimeAdd(
996+
TimeAdd(statCol.expr, oneMillisecondLiteralExpr),
997+
plusFiftyNineSecondsLiteralExpr)
998+
Column(Cast(widened, TimestampType))
962999
case (statCol, TimestampNTZType, _) if pathToStatType.head == MAX =>
9631000
// We also apply the same adjustment of max stats that was applied to Timestamp
9641001
// for TimestampNTZ because these 2 types have the same precision in terms of time.

spark/src/main/scala/org/apache/spark/sql/delta/stats/StatisticsCollection.scala

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, AstBuilder, Pars
4646
import org.apache.spark.sql.catalyst.parser.SqlBaseParser.MultipartIdentifierListContext
4747
import org.apache.spark.sql.functions._
4848
import org.apache.spark.sql.functions.lit
49-
import org.apache.spark.sql.internal.SQLConf
49+
import org.apache.spark.sql.expressions.UserDefinedFunction
5050
import org.apache.spark.sql.types._
5151

5252
/**
@@ -243,6 +243,15 @@ trait StatisticsCollection extends DeltaLogging {
243243
val stringPrefix =
244244
spark.sessionState.conf.getConf(DeltaSQLConf.DATA_SKIPPING_STRING_PREFIX_LENGTH)
245245

246+
// Formatter used to serialize timestamp MIN/MAX statistics to JSON. Historically, Spark 3.3.2
247+
// truncated historical timezone offsets down to minute precision when rendering timestamps,
248+
// which could make the serialized stats differ from the true values by up to 59 seconds (see
249+
// Delta issue 5249). To avoid that, we mirror Delta 4.0 and explicitly format timestamp stats
250+
// using a pattern that preserves offset seconds: `yyyy-MM-dd'T'HH:mm:ss.SSSXXXXX`.
251+
val sessionTimeZoneId = spark.sessionState.conf.sessionLocalTimeZone
252+
val timestampStatsFormatterUdf =
253+
StatisticsCollection.timestampStatsFormatterUdf(sessionTimeZoneId)
254+
246255
// On file initialization/stat recomputation TIGHT_BOUNDS is always set to true
247256
val tightBoundsColOpt =
248257
Option.when(deletionVectorsSupported &&
@@ -257,6 +266,10 @@ trait StatisticsCollection extends DeltaLogging {
257266
case (c, SkippingEligibleDataType(StringType), true) =>
258267
substring(min(c), 0, stringPrefix)
259268

269+
// Format timestamp min stats using a pattern that preserves offset seconds.
270+
case (c, SkippingEligibleDataType(TimestampType), true) =>
271+
timestampStatsFormatterUdf(min(c).cast(TimestampType))
272+
260273
// Collect all numeric min values
261274
case (c, SkippingEligibleDataType(_), true) =>
262275
min(c)
@@ -268,6 +281,10 @@ trait StatisticsCollection extends DeltaLogging {
268281
DeltaUDF.stringFromString(StatisticsCollection.truncateMaxStringAgg(stringPrefix)_)
269282
udfTruncateMax(max(c))
270283

284+
// Format timestamp max stats using a pattern that preserves offset seconds.
285+
case (c, SkippingEligibleDataType(TimestampType), true) =>
286+
timestampStatsFormatterUdf(max(c).cast(TimestampType))
287+
271288
// Collect all numeric max values
272289
case (c, SkippingEligibleDataType(_), true) =>
273290
max(c)
@@ -409,6 +426,35 @@ object StatisticsCollection extends DeltaCommand {
409426

410427
val UTF8_MAX_CHARACTER = new String(Character.toChars(Character.MAX_CODE_POINT))
411428

429+
/**
430+
* Builds a UDF for formatting timestamp statistics using a pattern that preserves offset seconds:
431+
* `yyyy-MM-dd'T'HH:mm:ss.SSSXXXXX`. This mirrors Delta 4.0 behavior so that new tables write
432+
* precise timestamp stats, while older tables with truncated offsets are handled by the
433+
* reader-side widening logic in `DataSkippingReader`.
434+
*/
435+
private[delta] def timestampStatsFormatterUdf(
436+
sessionTimeZoneId: String): UserDefinedFunction = {
437+
val timeZone =
438+
org.apache.spark.sql.delta.util.DateTimeUtils.getTimeZone(sessionTimeZoneId)
439+
val formatter =
440+
org.apache.spark.sql.delta.util.TimestampFormatter(
441+
"yyyy-MM-dd'T'HH:mm:ss.SSSXXXXX",
442+
timeZone)
443+
val formatTimestamp =
444+
(ts: java.sql.Timestamp) => {
445+
if (ts == null) {
446+
null
447+
} else {
448+
val micros =
449+
org.apache.spark.sql.delta.util.DateTimeUtils.fromJavaTimestamp(ts)
450+
org.apache.spark.sql.delta.util.DateTimeUtils.timestampToString(
451+
formatter,
452+
micros)
453+
}
454+
}
455+
udf(formatTimestamp)
456+
}
457+
412458
/**
413459
* The SQL grammar already includes a `multipartIdentifierList` rule for parsing a string into a
414460
* list of multi-part identifiers. We just expose it here, with a custom parser and AstBuilder.

spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.spark.sql.delta.stats
1818

1919
import java.io.File
20+
import java.sql.Timestamp
2021

2122
import org.apache.spark.sql.delta._
2223
import org.apache.spark.sql.delta.actions.AddFile
@@ -1698,6 +1699,47 @@ trait DataSkippingDeltaTestsBase extends DeltaExcludedBySparkVersionTestMixinShi
16981699
}
16991700
}
17001701

1702+
test("data skipping with timestamp stats truncated by seconds (issue 5249)") {
1703+
// This test simulates a table whose timestamp stats in JSON are off by tens of seconds
1704+
// compared to the actual data values, similar to Delta 3.3.2 behavior described in
1705+
// https://github.com/delta-io/delta/issues/5249. The DataSkippingReader must treat
1706+
// such stats as approximate bounds and must NOT skip the file for an equality predicate.
1707+
withTempDir { dir =>
1708+
import testImplicits._
1709+
1710+
val ts = Timestamp.valueOf("2019-09-09 01:02:03.456789")
1711+
val df = Seq(ts).toDF("ts")
1712+
df.write.format("delta").save(dir.getCanonicalPath)
1713+
1714+
val log = DeltaLog.forTable(spark, dir.getCanonicalPath)
1715+
1716+
// Overwrite stats for the single AddFile to mimic an older writer that recorded
1717+
// timestamp stats that are off by ~30 seconds from the true value. We set both
1718+
// min and max to a timestamp 30 seconds *after* the actual value, which would
1719+
// previously cause data skipping to think that `ts = actual` cannot match.
1720+
val txn = log.startTransaction()
1721+
val addFile = txn.filterFiles(Nil).head
1722+
1723+
val fakeStatsJson =
1724+
"""{
1725+
| "numRecords": 1,
1726+
| "minValues": {"ts": "2019-09-09 01:02:33.456789"},
1727+
| "maxValues": {"ts": "2019-09-09 01:02:33.456789"},
1728+
| "nullCount": {"ts": 0}
1729+
|}""".stripMargin
1730+
1731+
txn.commit(Seq(addFile.copy(stats = fakeStatsJson)), DeltaOperations.ComputeStats(Nil))
1732+
log.update()
1733+
1734+
val predicate = """ts = TIMESTAMP '2019-09-09 01:02:03.456789'"""
1735+
Given(predicate)
1736+
val numFiles = filesRead(log, predicate)
1737+
assert(numFiles == 1,
1738+
s"Expected timestamp file not to be skipped for equality predicate due to widened " +
1739+
s"timestamp stats bounds; filesRead was $numFiles")
1740+
}
1741+
}
1742+
17011743
testSparkMasterOnly("data skipping by stats - variant type") {
17021744
withTable("tbl") {
17031745
sql("""CREATE TABLE tbl(v VARIANT,

0 commit comments

Comments
 (0)