Skip to content

Conversation

@AnudeepKonaboina
Copy link
Contributor

@AnudeepKonaboina AnudeepKonaboina commented Nov 24, 2025

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR adds a missing read-path assertion to the existing corrupted-checkpoint recovery test in SnapshotManagementSuite, so the behavior described in #5458 is captured as an executable test.
Concretely, in the test:

  • recover from a corrupt checkpoint: previous checkpoint doesn't exist

we now also read the table data after corrupting checkpoint 0 and assert the row count:

// Checkpoint 0 is corrupted. Verify that we can still create the snapshot using
// existing json files.
DeltaLog.forTable(spark, path).snapshot

// GitHub issue #5458: Verify that we can still READ the table data even with
// corrupted

This encodes in code the expectation that a corrupted checkpoint must not make a healthy table unreadable and that Delta should fall back to JSON log files.

PROBLEM

In Delta 4.0, when the latest checkpoint parquet is corrupted, snapshot creation and spark.read.format("delta").load(path) can fail with [FAILED_READ_FILE.NO_HINT]. In Delta 3.2 the same scenario succeeded by falling back to JSON logs (see #5458).

Fix

In SnapshotManagement, broaden the exception type captured during snapshot creation from checkpoint segments so that IO/parquet errors when reading checkpoint files go through the existing retry/fallback path. If all retries fail, the first error is still rethrown as before.

Tests

  • New test SnapshotManagementSuite5458:
  • Writes a Delta table and creates a checkpoint.
  • Corrupts the checkpoint file (empty and non-empty cases).

Verified that:

  • DeltaLog.forTable(...).snapshot still builds a snapshot.
  • spark.read.format("delta").load(path).count() returns the expected row count.
  • This test fails on 4.0 without the code change and passes with the change applied.

How to Reproduce / Run

Run just this test:

build/sbt \ 'project spark' \ 'testOnly org.apache.spark.sql.delta.SnapshotManagementSuite -- -z "recover from a corrupt checkpoint: previous checkpoint doesn"

Current behavior (expected with this PR):

On a 4.x stack, this test fails with [FAILED_READ_FILE.NO_HINT] when trying to read the corrupted checkpoint parquet.
which doesn't fail with 3.2

  • Intended future behavior (after a follow-up fix in SnapshotManagement):
  • The same test should pass, with count === 10, because Delta will detect the corrupted checkpoint, fall back to JSON logs, and keep the table readable.
  • Follow-up Work (separate PR)

This PR is intentionally test‑only. A follow-up PR will:

  • Adjust SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment to treat any non‑fatal failure while using a checkpointed LogSegment as “bad checkpoint → use an equivalent log segment that doesn’t rely on that checkpoint (JSON fallback)”.
  • Ensure that both the driver-side snapshot creation and the executor-side data read succeed in the presence of a corrupted checkpoint parquet.

How was this was tested?

I ran the below code in delta 3.2 and delta 4.0 with the count.Below are the corresponding logs:

Code:

import org.apache.spark.sql.delta._
import org.apache.hadoop.fs._

println("==== Delta 3.2 checkpoint recovery test (issue 5458) ====")

val path = "/tmp/delta_5458_test"
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val tablePath = new Path(path)
if (fs.exists(tablePath)) fs.delete(tablePath, true)

// 1. Write table and checkpoint 0
spark.range(10).write.format("delta").save(path)
var log = DeltaLog.forTable(spark, path)
log.checkpoint()
println("Wrote table and created checkpoint 0")

// 2. CORRUPT the checkpoint ON HDFS
DeltaLog.clearCache()
log = DeltaLog.forTable(spark, path)
val logPath = new Path(path, "_delta_log")
val statuses = fs.listStatus(logPath)
val cp = statuses.map(_.getPath)
  .find(p => p.getName.contains("checkpoint") && p.getName.endsWith(".parquet")).get
println(s"Corrupting checkpoint on HDFS: $cp")

// Overwrite the parquet with a tiny non-parquet payload
val out = fs.create(cp, true)   // overwrite = true
out.write("NOT_A_PARQUET_FILE".getBytes("UTF-8"))
out.close()
println("Checkpoint overwritten with invalid contents")

// 4. Table read (executor-side) — this is the real issue #5458 check
println("Running spark.read.format(\"delta\").load(path).count() ...")
val c = spark.read.format("delta").load(path).count()
println(s"Read count = $c")

Logs In delta 3.2 , jar - io.delta:delta-spark_2.13:3.2.2

25/11/23 09:17:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.2
      /_/
         
Using Scala version 2.13.8 (OpenJDK 64-Bit Server VM, Java 17.0.9)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context Web UI available at http://hadoop.spark:4040
Spark context available as 'sc' (master = local[*], app id = local-1763889465775).
Spark session available as 'spark'.

scala> :paste
// Entering paste mode (ctrl-D to finish)

>....                                                                                                                                          val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val tablePath = new Path(path)
if (fs.exists(tablePath)) fs.delete(tablePath, true)

// 1. Write table and checkpoint 0
spark.range(10).write.format("delta").save(path)
var log = DeltaLog.forTable(spark, path)
log.checkpoint()
println("Wrote table and created checkpoint 0")

// 2. CORRUPT the checkpoint ON HDFS
DeltaLog.clearCache()
log = DeltaLog.forTable(spark, path)
val logPath = new Path(path, "_delta_log")
val statuses = fs.listStatus(logPath)
val cp = statuses.map(_.getPath)
  .find(p => p.getName.contains("checkpoint") && p.getName.endsWith(".parquet")).get
println(s"Corrupting checkpoint on HDFS: $cp")

// Overwrite the parquet with a tiny non-parquet payload
val out = fs.create(cp, true)   // overwrite = true
out.write("NOT_A_PARQUET_FILE".getBytes("UTF-8"))
out.close()
println("Checkpoint overwritten with invalid contents")

// 3. Snapshot (driver-side)
DeltaLog.clearCache()
println("Creating snapshot from DeltaLog.forTable(...).snapshot ...")
val snap = DeltaLog.forTable(spark, path).snapshot
println(s"Snapshot version = ${snap.version}")

// 4. Table read (executor-side) ??? this is the real issue #5458 check
println("Running spark.read.format(\"delta\").load(path).count() ...")
val c = spark.read.format("delta").load(path).count()
println(s"Read count = $c")


// Exiting paste mode, now interpreting.

warning: 2 deprecations (since 12.0); for details, enable `:setting -deprecation` or `:replay -deprecation`
==== Delta 3.2 checkpoint recovery test (issue 5458) ====
25/11/23 09:18:05 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Wrote table and created checkpoint 0                                            
Corrupting checkpoint on HDFS: hdfs://hadoop.spark:9000/tmp/delta_5458_test/_delta_log/00000000000000000000.checkpoint.parquet
Checkpoint overwritten with invalid contents
Creating snapshot from DeltaLog.forTable(...).snapshot ...
25/11/23 09:18:08 ERROR Executor: Exception in task 0.0 in stage 15.0 (TID 110)
Caused by: java.lang.RuntimeException: hdfs://hadoop.spark:9000/tmp/delta_5458_test/_delta_log/00000000000000000000.checkpoint.parquet is not a Parquet file. Expected magic number at tail, but found [70, 73, 76, 69]
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:565)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:85)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:71)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:66)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:213)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:217)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Read count = 10
import org.apache.spark.sql.delta._
import org.apache.hadoop.fs._
val path: String = /tmp/delta_5458_test
val fs: org.apache.hadoop.fs.FileSystem = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1847489893_1, ugi=root (auth:SIMPLE)]]
val tablePath: org.apache.hadoop.fs.Path = /tmp/delta_5458_test
var log: org.apache.spark.sql.delta.DeltaLog = org.apache.spark.sql.delta.DeltaLog@2b1f112b
// mutated log
val logPath: org.apache.hadoop.fs.Path = /tmp/delta_5458_test/_delta_log
val statuses: Array[org.apache.hadoop.fs.FileStatus] = Array(HdfsNamedFileStatus{path=hdfs://hadoop.spark:9000/tmp/delta_5458_test/_delta_log/00000000000000000000...

Logs In delta 4.0, jar - io.delta:delta-spark_2.13:4.0.0

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 4.0.0
      /_/
         
Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.9)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context Web UI available at http://hadoop.spark:4040
Spark context available as 'sc' (master = local[*], app id = local-1763890926372).
Spark session available as 'spark'.

scala> :paste
// Entering paste mode (ctrl-D to finish)

>....                                                                                                                                          val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val tablePath = new Path(path)
if (fs.exists(tablePath)) fs.delete(tablePath, true)

// 1. Write table and checkpoint 0
spark.range(10).write.format("delta").save(path)
var log = DeltaLog.forTable(spark, path)
log.checkpoint()
println("Wrote table and created checkpoint 0")

// 2. CORRUPT the checkpoint ON HDFS
DeltaLog.clearCache()
log = DeltaLog.forTable(spark, path)
val logPath = new Path(path, "_delta_log")
val statuses = fs.listStatus(logPath)
val cp = statuses.map(_.getPath)
  .find(p => p.getName.contains("checkpoint") && p.getName.endsWith(".parquet")).get
println(s"Corrupting checkpoint on HDFS: $cp")

// Overwrite the parquet with a tiny non-parquet payload
val out = fs.create(cp, true)   // overwrite = true
out.write("NOT_A_PARQUET_FILE".getBytes("UTF-8"))
out.close()
println("Checkpoint overwritten with invalid contents")

// 3. Snapshot (driver-side)
DeltaLog.clearCache()
println("Creating snapshot from DeltaLog.forTable(...).snapshot ...")
val snap = DeltaLog.forTable(spark, path).snapshot
println(s"Snapshot version = ${snap.version}")

// 4. Table read (executor-side) ??? this is the real issue #5458 check
println("Running spark.read.format(\"delta\").load(path).count() ...")
val c = spark.read.format("delta").load(path).count()
println(s"Read count = $c")

// Exiting paste mode... now interpreting.
warning: 2 deprecations (since 12.0); for details, enable `:setting -deprecation` or `:replay -deprecation`
==== Delta 4.0 checkpoint recovery test (issue 5458) ====
Wrote table and created checkpoint 0                                            
Corrupting checkpoint on HDFS: hdfs://hadoop.spark:9000/tmp/delta_5458_test/_delta_log/00000000000000000000.checkpoint.parquet
Checkpoint overwritten with invalid contents
Creating snapshot from DeltaLog.forTable(...).snapshot ...
Snapshot version = 0
Running spark.read.format("delta").load(path).count() ...
09:42:19.862 [Executor task launch worker for task 0.0 in stage 14.0 (TID 157)] ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 14.0 (TID 157)
org.apache.spark.SparkException: [FAILED_READ_FILE.NO_HINT] Encountered error while reading file hdfs://hadoop.spark:9000/tmp/delta_5458_test/_delta_log/00000000000000000000.checkpoint.parquet.  SQLSTATE: KD001
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:856) ~[spark-catalyst_2.13-4.0.0.jar:4.0.0]
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2$.attachFilePath(FileDataSourceV2.scala:142) ~[spark-sql_2.13-4.0.0.jar:4.0.0]
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:142) ~[spark-sql_2.13-4.0.0.jar:4.0.0]
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:695) ~[spark-sql_2.13-4.0.0.jar:4.0.0]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) ~[?:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[?:?]
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) ~[spark-sql_2.13-4.0.0.jar:4.0.0]
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50) ~[spark-sql_2.13-4.0.0.jar:4.0.0]
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583) ~[scala-library-2.13.16.jar:?]
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143) ~[spark-core_2.13-4.0.0.jar:4.0.0]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57) ~[spark-core_2.13-4.0.0.jar:4.0.0]

The count gets printed in 3.2 along with the error , but in 4.0 the count doesn't get printed meaning it falls back to reading JSON files.

Does this PR introduce any user-facing changes?

No.this is just a test case.The actual fix must be done and an other PR would be raised

Add table read operation to 'recover from a corrupt checkpoint' test
to demonstrate regression in Delta 4.0.

In Delta 3.2: corrupted checkpoint falls back to JSON logs and read succeeds
In Delta 4.0: corrupted checkpoint causes FAILED_READ_FILE error

This test currently fails on Delta 4.0 with:
org.apache.spark.SparkException: [FAILED_READ_FILE.NO_HINT] Encountered error
while reading file ...checkpoint.parquet

Relates to delta-io#5458
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant