Skip to content

Conversation

@AnudeepKonaboina
Copy link
Contributor

@AnudeepKonaboina AnudeepKonaboina commented Nov 24, 2025

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR fixes a correctness bug in the 3.3 line where timestamp column statistics can cause Delta to skip files incorrectly for historical timestamps (e.g. Europe/Stockholm in 1850) whose timezone offsets include seconds.

Older Delta versions (3.2.x / 3.3.x) wrote stats with timezone offsets truncated to minutes in JSON (e.g. +00:53 instead of +00:53:28). When data skipping uses these stats, files that do contain matching rows may be pruned, so queries silently “lose” data.

Note: I have made a reader side fix for backward compatibility & safety as existing tables with already-bad 3.3.2 stats continue to work; widening the [min, max] window prevents data loss when predicates hit those files. Also i have made the write side fix to not allow truncated timestamps , so that newly written stats are correct.

This PR:

  • Makes the data skipping reader more tolerant to slightly wrong timestamp stats by widening the effective [min, max] range.
  • Clarifies PROTOCOL.md that timestamp stats are approximate and readers may widen bounds.
  • Writer-side change in StatisticsCollection to format timestamp MIN/MAX stats with yyyy-MM-dd'T'HH:mm:ss.SSSXXXXX (offset seconds preserved).
  • Adds a regression test that simulates truncated timestamp stats and proves the fix.
  • It targets the 3.3 line and is intended as a backwards-compatible bugfix for existing tables.
  • Fixes [Feature Request][Spark] Write timestamps adjusted to UTC in column statistics #5512

Code used for Reproducing the issue

// Sweden in 1850 (Europe/Stockholm)
val df = spark.sql("""
  WITH ts AS (
    SELECT make_timestamp(1850, 1, 1, 1, 1, 1) AS ts
  )
  SELECT date_format(ts, 'yyyy-MM-dd HH:mm:ss XXXX') AS ts_string, ts
  FROM ts
""")

df.show(false)
//Write data
df.write.format("delta").mode("overwrite").save("/tmp/delta3_ts")

//Read it back
val df2 = spark.read.format("delta").load("/tmp/delta3_ts")
df2.where("ts = make_timestamp(1850,1,1,1,1,1)").show()

Output in delta-3.3.2

25/11/24 13:58:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.2
      /_/
         
Using Scala version 2.13.8 (OpenJDK 64-Bit Server VM, Java 17.0.9)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context Web UI available at http://hadoop.spark:4040
Spark context available as 'sc' (master = local[*], app id = local-1763992685634).
Spark session available as 'spark'.

scala> :paste
// Entering paste mode (ctrl-D to finish)

val df = spark.sql("""
  WITH ts AS (
    SELECT make_timestamp(1850, 1, 1, 1, 1, 1) AS ts
  )
  SELECT date_format(ts, 'yyyy-MM-dd HH:mm:ss XXXX') AS ts_string, ts
  FROM ts
""")

df.show(truncate = false)   // sanity check
df.write.format("delta").mode("overwrite").save("/tmp/delta3_2_2_new")


// Exiting paste mode, now interpreting.

+---------------------------+-------------------+
|ts_string                  |ts                 |
+---------------------------+-------------------+
|1850-01-01 01:01:01 +005328|1850-01-01 01:01:01|
+---------------------------+-------------------+

25/11/24 13:58:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
val df: org.apache.spark.sql.DataFrame = [ts_string: string, ts: timestamp]     

scala> val df2 = spark.read.format("delta").load("/tmp/delta3_2_2_new")
val df2: org.apache.spark.sql.DataFrame = [ts_string: string, ts: timestamp]

scala> df2.where("ts = make_timestamp(1850,1,1,1,1,1)").show()
+---------+---+
|ts_string| ts|
+---------+---+
+---------+---+

Delta stats for 3.3.2:

[root@hadoop /]# hdfs dfs -cat /tmp/delta3_3_2/_delta_log/00000000000000000000.json
2025-11-24 14:46:25,626 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
{"commitInfo":{"timestamp":1763995523381,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"915"},"engineInfo":"Apache-Spark/3.5.2 Delta-Lake/3.2.0","txnId":"45b4c847-19be-4258-9df0-57a6202f6b5a"}}
{"metaData":{"id":"2ba595ee-a039-409f-90a1-75bd691cf8e1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"ts_string\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ts\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1763995521911}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"add":{"path":"part-00000-67ea1ea6-84f7-461d-859d-718166d91fff-c000.snappy.parquet","partitionValues":{},"size":915,"modificationTime":1763995522896,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"ts_string\":\"1850-01-01 01:01:01 +005328\",\"ts\":\"1850-01-01T01:01:01.000+00:53\"},\"maxValues\":{\"ts_string\":\"1850-01-01 01:01:01 +005328\",\"ts\":\"1850-01-01T01:01:01.000+00:53\"},\"nullCount\":{\"ts_string\":0,\"ts\":0}}"}}
[root@hadoop /]# 

Output in delta-4.0.0

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 4.0.0
      /_/
         
Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.9)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context Web UI available at http://hadoop.spark:4040
Spark context available as 'sc' (master = local[*], app id = local-1763993742378).
Spark session available as 'spark'.

scala> :paste
// Entering paste mode (ctrl-D to finish)

val df = spark.sql("""
  WITH ts AS (
    SELECT make_timestamp(1850, 1, 1, 1, 1, 1) AS ts
  )
  SELECT date_format(ts, 'yyyy-MM-dd HH:mm:ss XXXX') AS ts_string, ts
  FROM ts
""")

df.show(truncate = false)   // sanity check
df.write.format("delta").mode("overwrite").save("/tmp/delta4_0_0")

// Exiting paste mode... now interpreting.
+---------------------------+-------------------+
|ts_string                  |ts                 |
+---------------------------+-------------------+
|1850-01-01 01:01:01 +005328|1850-01-01 01:01:01|
+---------------------------+-------------------+

val df: org.apache.spark.sql.DataFrame = [ts_string: string, ts: timestamp]     

scala> val df2 = spark.read.format("delta").load("/tmp/delta4_0_0")
val df2: org.apache.spark.sql.DataFrame = [ts_string: string, ts: timestamp]

scala> df2.where("ts = make_timestamp(1850,1,1,1,1,1)").show()
+--------------------+-------------------+
|           ts_string|                 ts|
+--------------------+-------------------+
|1850-01-01 01:01:...|1850-01-01 01:01:01|
+--------------------+-------------------+

Delta stats in 4.0:

[root@hadoop /]# hdfs dfs -ls /tmp/delta4_0_0/_delta_log
2025-11-24 14:30:50,901 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r--   1 root supergroup       1046 2025-11-24 14:16 /tmp/delta4_0_0/_delta_log/00000000000000000000.crc
-rw-r--r--   1 root supergroup       1288 2025-11-24 14:16 /tmp/delta4_0_0/_delta_log/00000000000000000000.json
drwxr-xr-x   - root supergroup          0 2025-11-24 14:16 /tmp/delta4_0_0/_delta_log/_staged_commits
[root@hadoop /]# hdfs dfs -cat /tmp/delta4_0_0/_delta_log/00000000000000000000.json
2025-11-24 14:31:03,202 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
{"commitInfo":{"timestamp":1763993761811,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numRemovedFiles":"0","numRemovedBytes":"0","numOutputRows":"1","numOutputBytes":"944"},"engineInfo":"Apache-Spark/4.0.0 Delta-Lake/4.0.0","txnId":"e71c1a9c-3167-4d76-b7c9-4744e7a0bf51"}}
{"metaData":{"id":"c0eb20a2-922f-4703-8256-f3e5c5e159b2","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"ts_string\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ts\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1763993760257}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"add":{"path":"part-00000-6600fb05-9251-4c90-9b19-61641d9226f5-c000.snappy.parquet","partitionValues":{},"size":944,"modificationTime":1763993761398,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"ts_string\":\"1850-01-01 01:01:01 +005328\",\"ts\":\"1850-01-01T01:01:01.000+00:53:28\"},\"maxValues\":{\"ts_string\":\"1850-01-01 01:01:01 +005328\",\"ts\":\"1850-01-01T01:01:01.000+00:53:28\"},\"nullCount\":{\"ts_string\":0,\"ts\":0}}"}}

What changes have been made in the PR

DataSkippingReader: widen timestamp stats bounds

Delta already compensates for millisecond truncation of timestamps in JSON by widening MAX by +1 ms. For issue #5249, older writers can also be off by up to 59 seconds because timezone offsets were truncated from HH:mm:ss to HH:mm.

To avoid incorrect pruning, this PR treats timestamp stats as approximate and widens the effective interval.

How was this patch tested?

  • Added a regression test

    • data skipping with timestamp stats truncated by seconds (issue 5249) in
      DataSkippingDeltaTests.scala, which:
      • writes a Delta table with a single TIMESTAMP value,
      • manually overwrites the file’s stats JSON so that minValues/maxValues
        are shifted by +30 seconds (mimicking the 3.2.x/3.3.x truncated-offset behavior),
      • runs an equality predicate on the true timestamp and asserts that the file
        is not skipped by data skipping.
  • Ran spark/testOnly org.apache.spark.sql.delta.DataSkippingDeltaTests locally.

  • An assembly jar was created with the changes and the same repro code was tested locally and it works and we get the expected output

[root@hadoop /]# spark-shell \
>   --jars /tmp/delta-spark_2.12-3.3.3-SNAPSHOT.jar,/tmp/delta-storage-3.4.0-SNAPSHOT.jar \
>   --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
>   --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
>   --conf spark.sql.session.timeZone=Europe/Stockholm \
>   --conf spark.sql.parquet.int96RebaseModeInWrite=CORRECTED
Spark context Web UI available at http://hadoop.spark:4040
Spark context available as 'sc' (master = local[*], app id = local-1764000335303).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.2
      /_/
         
Using Scala version 2.12.18 (OpenJDK 64-Bit Server VM, Java 17.0.9)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

val df = spark.sql("""
  WITH ts AS (
    SELECT make_timestamp(1850, 1, 1, 1, 1, 1) AS ts
  )
  SELECT date_format(ts, 'yyyy-MM-dd HH:mm:ss XXXX') AS ts_string, ts
  FROM ts
""")

df.show(truncate = false)   // sanity check
df.write.format("delta").mode("overwrite").save("/tmp/delta3_3_2_fix_new")

// Exiting paste mode, now interpreting.

+---------------------------+-------------------+
|ts_string                  |ts                 |
+---------------------------+-------------------+
|1850-01-01 01:01:01 +005328|1850-01-01 01:01:01|
+---------------------------+-------------------+

df: org.apache.spark.sql.DataFrame = [ts_string: string, ts: timestamp]         

scala> val df2 = spark.read.format("delta").load("/tmp/delta3_3_2_fix_new")
df2: org.apache.spark.sql.DataFrame = [ts_string: string, ts: timestamp]

scala> df2.where("ts = make_timestamp(1850,1,1,1,1,1)").show()
+--------------------+-------------------+
|           ts_string|                 ts|
+--------------------+-------------------+
|1850-01-01 01:01:...|1850-01-01 01:01:01|
+--------------------+-------------------+


Delta stats in 3.3 after fix

[root@hadoop /]# hdfs dfs -cat /tmp/delta3_3_2_fixed_2/_delta_log/00000000000000000000.json
2025-11-25 02:20:28,080 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
{"commitInfo":{"timestamp":1764037169568,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"915"},"engineInfo":"Apache-Spark/3.5.2 Delta-Lake/3.3.3-SNAPSHOT","txnId":"b635884f-4ca5-4bd3-ad9b-e9817789a8e6"}}
{"metaData":{"id":"32fbafe1-0cca-4500-b705-b608ad4178dc","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"ts_string\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ts\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1764037168436}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"add":{"path":"part-00000-6de871c2-8b3a-4b77-b3d8-8b1648e5b5b3-c000.snappy.parquet","partitionValues":{},"size":915,"modificationTime":1764037169001,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"ts_string\":\"1850-01-01 01:01:01 +005328\",\"ts\":\"1850-01-01T01:01:01.000+00:53:28\"},\"maxValues\":{\"ts_string\":\"1850-01-01 01:01:01 +005328\",\"ts\":\"1850-01-01T01:01:01.000+00:53:28\"},\"nullCount\":{\"ts_string\":0,\"ts\":0}}"}}

Does this PR introduce any user-facing changes?

Yes, but only as a bug fix:

  • Queries on tables written by Delta 3.2.x/3.3.x that use TIMESTAMP filters
    on historical timezones (with second-level offsets, e.g. +00:53:28) will
    now return the correct rows instead of silently missing them due to
    overly-aggressive stats-based data skipping.
  • To guarantee correctness, the effective [min, max] range for timestamp stats
    is widened by up to 59 seconds, so in rare cases slightly more files may be
    scanned for timestamp predicates. No API, configuration, or protocol-breaking
    changes are introduced.

@AnudeepKonaboina AnudeepKonaboina changed the title [Feature][Spark] Fixing data skipping for timestamp stats with truncated timezone offsets (issue #5249) [Feature][Spark] Fixing data skipping for timestamp stats with truncated timezone offsets (Fixes #5249) Nov 24, 2025
@AnudeepKonaboina AnudeepKonaboina changed the title [Feature][Spark] Fixing data skipping for timestamp stats with truncated timezone offsets (Fixes #5249) [Feature][Spark] Fixing data skipping for timestamp stats with truncated timezone offsets (Fixes #5512) Nov 24, 2025
@AnudeepKonaboina AnudeepKonaboina force-pushed the feature/fix-timestamp-stats-5249-3.3 branch from ab7cfad to ec7f2c1 Compare November 25, 2025 10:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant