From f6a99d70c9c598c98a00f1fd912cd8a839f07b80 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 24 Nov 2025 20:41:28 -0800 Subject: [PATCH 1/9] [DSv2] Fix startingVersion support by using protocol validation at read time This change fixes the startingVersion support in DSv2 streaming by: 1. Removing the requirement for initialSnapshot parameter 2. Using getActionsFromCommitFilesWithProtocolValidation to validate protocol for each commit at read time (similar to V1 behavior) 3. Extracting getActionsUnsafe utility method to StreamingHelper The key insight is that we don't need a pre-loaded snapshot for protocol validation - we can validate each commit's protocol as we read it, which matches V1's behavior and avoids the issue where initialSnapshot.version < startVersion. Test: DeltaSourceDSv2Suite startingVersion tests now pass --- dsv2-startingversion-issue.md | 629 ++++++++++++++++++ kernel-api-relaxed-snapshot-proposal.md | 364 ++++++++++ .../spark/read/SparkMicroBatchStream.java | 18 +- .../io/delta/kernel/spark/read/SparkScan.java | 5 +- .../kernel/spark/read/SparkScanBuilder.java | 6 +- .../kernel/spark/utils/StreamingHelper.java | 17 + 6 files changed, 1030 insertions(+), 9 deletions(-) create mode 100644 dsv2-startingversion-issue.md create mode 100644 kernel-api-relaxed-snapshot-proposal.md diff --git a/dsv2-startingversion-issue.md b/dsv2-startingversion-issue.md new file mode 100644 index 00000000000..a9bca7c62bf --- /dev/null +++ b/dsv2-startingversion-issue.md @@ -0,0 +1,629 @@ +# DSv2 StartingVersion Issue Analysis + +## Executive Summary + +**TL;DR**: The `startingVersion` test failure is NOT a bug. It's a **fundamental design difference** between V1 and Kernel API: + +- **V1 API**: Can read changelog files without validating protocol → fast but potentially incorrect +- **Kernel API**: Requires protocol/metadata to read AddFile correctly → safe but more restrictive +- **Why it matters**: Features like Column Mapping change how AddFile.stats and partitionValues are interpreted +- **Recommendation**: Accept as breaking change; DSv2 is more correct per Delta Protocol specification + +This document explains the technical justification from `PROTOCOL.md` for why Kernel API requires a `startSnapshot` parameter. + +--- + +## Problem Summary + +The `startingVersion` test in `DeltaSourceDSv2Suite` is failing because: +- **Expected**: Read 10 rows (values 10-19) when starting from version 1 +- **Actual**: Read 0 rows (empty result) + +## Root Cause + +There's a fundamental API design difference between Delta v1 and Kernel API regarding starting versions: + +### V1 API Behavior (DeltaSource.scala) +```scala +protected def getFileChanges(fromVersion: Long, ...): ClosableIterator[IndexedFile] = { + def filterAndIndexDeltaLogs(startVersion: Long): ClosableIterator[IndexedFile] = { + deltaLog.getChangeLogFiles(startVersion, catalogTableOpt, options.failOnDataLoss).flatMapWithClose { + case (version, filestatus) => + // Directly read changelog files without requiring snapshot at startVersion + ... + } + } +} +``` + +**Key point**: V1 API can start reading from any version by directly reading changelog files. It does NOT require the snapshot at `startVersion` to be readable/reconstructable. + +### Kernel API Behavior (CommitRange.java) +```java +/** + * @param startSnapshot the snapshot for startVersion, required to ensure the table is readable by + * Kernel at startVersion + * @throws IllegalArgumentException if startSnapshot.getVersion() != startVersion + * @throws KernelException if the version range contains a version with reader protocol that is + * unsupported by Kernel + */ +CloseableIterator getActions( + Engine engine, + Snapshot startSnapshot, // <-- REQUIRED! + Set actionSet); +``` + +**Key point**: Kernel API REQUIRES a valid snapshot at `startVersion`. This enforces that the table must be readable by Kernel at that version. + +### DSv2 Implementation (SparkMicroBatchStream.java) + +```java +private CloseableIterator filterDeltaLogs( + long startVersion, Optional endOffset) { + + CommitRange commitRange = snapshotManager.getTableChanges(engine, startVersion, endVersionOpt); + + // Required by kernel: perform protocol validation by creating a snapshot at startVersion. + Snapshot startSnapshot; + try { + startSnapshot = snapshotManager.loadSnapshotAt(startVersion); // Line 607 + } catch (io.delta.kernel.exceptions.KernelException e) { + // If startVersion doesn't exist (e.g., starting from "latest" when the next version + // hasn't been written yet), return empty iterator. + return Utils.toCloseableIterator(allIndexedFiles.iterator()); // <-- RETURNS EMPTY! + } + + try (CloseableIterator actionsIter = + commitRange.getActions(engine, startSnapshot, ACTION_SET)) { // Line 615 + // Process actions... + } +} +``` + +**The problem**: When `loadSnapshotAt(startVersion)` fails (e.g., due to unsupported protocol), the code catches the exception and returns an empty iterator, resulting in 0 rows read. + +## Why This Difference Matters + +### Scenario 1: Old Protocol Version +If `startVersion=1` uses a protocol that Kernel doesn't support: +- **V1**: Can still read from version 1 onward by reading changelog files directly +- **Kernel/DSv2**: Cannot create snapshot at version 1, so returns empty results + +### Scenario 2: Non-reconstructable Snapshot +If the snapshot at `startVersion` cannot be reconstructed (missing checkpoint, etc.): +- **V1**: Can still read changelog files +- **Kernel/DSv2**: Cannot create snapshot, returns empty results + +### Scenario 3: Comment from SparkMicroBatchStream +```java +// When starting from a given version, we don't require that the snapshot of this +// version can be reconstructed, even though the input table is technically in an +// inconsistent state. If the snapshot cannot be reconstructed, then the protocol +// check is skipped, so this is technically not safe, but we keep it this way for +// historical reasons. +``` + +This comment in `validateProtocolAt()` (line 438-443) acknowledges that V1 has this behavior "for historical reasons" but DSv2 cannot implement the same behavior due to Kernel API constraints. + +## Why Kernel API Requires startSnapshot + +Looking at the Kernel API design and Delta Protocol specification, the `startSnapshot` parameter is required because **you cannot correctly read AddFile actions without knowing the Protocol and Metadata**. + +### Protocol-Dependent Features That Affect AddFile Reading + +From `PROTOCOL.md`, there are multiple table features that fundamentally change how AddFiles must be interpreted: + +#### 1. **Column Mapping** (Reader Version 2+) +``` +Reader Requirements (PROTOCOL.md line 930-937): +- In 'id' mode: Must resolve columns by field_id in parquet metadata +- In 'name' mode: Must resolve columns by physicalName +- Partition values and statistics must be resolved by physical names +``` + +**Impact**: Without knowing if Column Mapping is enabled and which mode, you cannot: +- Read partition values correctly from AddFile.partitionValues +- Parse column statistics correctly from AddFile.stats +- Read the parquet files themselves + +#### 2. **Deletion Vectors** (Reader Version 3+, Writer Version 7+) +``` +AddFile Schema (PROTOCOL.md line 516): +- deletionVector: [DeletionVectorDescriptor Struct] (optional) + +Reader Requirements (PROTOCOL.md line 1006-1007): +- If a logical file has a DV, invalidated records MUST NOT be returned +``` + +**Impact**: Without checking protocol, you don't know: +- If Deletion Vectors are supported +- How to handle AddFile.deletionVector field +- Which rows to filter when reading + +#### 3. **Row Tracking** (Writer Version 7+) +``` +AddFile Schema (PROTOCOL.md line 517-518): +- baseRowId: Long (optional) +- defaultRowCommitVersion: Long (optional) +``` + +**Impact**: These fields only exist if Row Tracking feature is active. + +#### 4. **Clustered Tables** (Writer Version 7+) +``` +AddFile Schema (PROTOCOL.md line 519): +- clusteringProvider: String (optional) +``` + +**Impact**: Without protocol, can't tell if this field is present. + +### Why V1 Could Skip This - THE KEY DIFFERENCE + +**V1 uses the CURRENT (source init) snapshot's P&M to read ALL historical AddFiles:** + +```scala +// From DeltaSource.scala line 728-734 +case class DeltaSource( + spark: SparkSession, + deltaLog: DeltaLog, + catalogTableOpt: Option[CatalogTable], + options: DeltaOptions, + snapshotAtSourceInit: SnapshotDescriptor, // <-- FIXED at source init! + metadataPath: String, + ... +) + +// Line 199-228: readSnapshotDescriptor is based on snapshotAtSourceInit +protected lazy val readSnapshotDescriptor: SnapshotDescriptor = + persistedMetadataAtSourceInit.map { ... } + .getOrElse(snapshotAtSourceInit) // Use snapshot from source init + +// Line 390-407: ALL AddFiles are read using this SAME descriptor +deltaLog.createDataFrame( + readSnapshotDescriptor, // <-- Same P&M for all versions! + addFiles.map(_.getFileAction.asInstanceOf[AddFile]), + isStreaming = true) +``` + +**Key insight**: V1 gets ONE snapshot when the stream starts (usually the latest), then uses that snapshot's Protocol and Metadata to interpret AddFiles from ALL versions, including historical ones. + +This works because V1 assumes: +1. **Forward compatibility**: Current P&M can read historical AddFiles +2. **Single interpretation**: Use one consistent view of schema/features +3. **Lazy validation**: Errors surface at DataFrame creation, not log reading +4. **No per-version snapshot needed**: Only need to read changelog files, not reconstruct snapshots +5. **startVersion doesn't need to be readable**: As long as changelog exists, can read it + +### Why Kernel API Is Stricter + +**Kernel API requires the startVersion's P&M to read that version's AddFiles:** + +```java +// From CommitRange.java line 96-97: +@param startSnapshot the snapshot for startVersion, required to ensure the table is readable by + Kernel at startVersion + +// From SparkMicroBatchStream.java line 604-615 +CommitRange commitRange = snapshotManager.getTableChanges(engine, startVersion, endVersionOpt); + +// Must load snapshot AT startVersion (not latest!) +Snapshot startSnapshot = snapshotManager.loadSnapshotAt(startVersion); + +// Use startVersion's P&M to read its AddFiles +CloseableIterator actionsIter = + commitRange.getActions(engine, startSnapshot, ACTION_SET); +``` + +The startSnapshot provides: +1. **Protocol**: Which features are active at THAT version (not current) +2. **Metadata**: Schema with column mapping info at THAT version +3. **Validation**: Ensures startVersion is actually readable by Kernel +4. **Per-version interpretation**: Each version's AddFiles interpreted with that version's P&M + +## The Fundamental Architectural Difference + +This is the core of the issue: + +| Aspect | V1 API | Kernel API | +|--------|--------|------------| +| **P&M Source** | Latest snapshot (at source init) | startVersion's snapshot | +| **P&M Updates** | Fixed throughout stream | Per version | +| **Assumption** | Forward compatible | Version-specific | +| **When fails** | At DataFrame read time | At log read time (earlier) | +| **Requirement** | Only changelog files exist | Snapshot must be reconstructable | +| **startingVersion** | Just needs commit file | Needs full snapshot | + +### Why This Matters + +**Scenario: Reading from version 10, current version is 100** + +V1: +```scala +// Use version 100's P&M to read version 10's AddFiles +// Works if P&M is forward compatible +// Fails if version 10 used different column mapping mode +``` + +Kernel: +```java +// Must reconstruct version 10's snapshot +// Use version 10's P&M to read version 10's AddFiles +// Fails if version 10 has unsupported protocol +// More correct but more restrictive +``` + +### Concrete Example: Column Mapping + +Without startSnapshot: +```java +// AddFile has partitionValues = {"col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49": "2024-01-01"} +// Is this the actual column name, or a physical name? +// Need metadata to know if Column Mapping is active! + +// AddFile.stats contains: +// {"col-5f422f40-de70-45b2-88ab-1d5c90e94db1": {"min": 1, "max": 100}} +// What column does this refer to? +// Need metadata to map physical name -> logical name! +``` + +With startSnapshot: +```java +Snapshot snapshot = loadSnapshotAt(version); +Protocol protocol = snapshot.getProtocol(); +Metadata metadata = snapshot.getMetadata(); + +if (protocol.supportsColumnMapping()) { + // Use metadata.schema to map physical names -> logical names + String logicalName = metadata.getLogicalName("col-a7f4159c-..."); +} +``` + +### From PROTOCOL.md Line 1973-1984 + +``` +# Requirements for Readers + +Reader Version 2: Respect Column Mapping +Reader Version 3: Respect Table Features for readers +``` + +**You cannot read a Delta table correctly without knowing its Protocol and Metadata.** + +This is a more restrictive but safer API design compared to V1. + +## Impact + +This design difference means: + +1. **Test Failures**: Tests like `startingVersion` that rely on V1's lenient behavior will fail in DSv2 +2. **Behavior Change**: Users who relied on starting from non-readable versions will see different behavior +3. **API Limitation**: Kernel API fundamentally cannot support the same use cases as V1 API + +## Possible Solutions + +### Option 1: Relax Kernel API Requirements +Modify `CommitRange.getActions()` to not require `startSnapshot`, or make it optional. This would require changes to the Kernel API itself. + +**Pros**: Most compatible with V1 behavior +**Cons**: Requires Kernel API changes, may reduce safety guarantees + +### Option 2: Document as Breaking Change +Document that DSv2 has stricter requirements for `startingVersion` and this is a known behavior change. + +**Pros**: No code changes needed +**Cons**: Breaking change for users + +### Option 3: Fallback Mechanism +When snapshot creation fails, attempt to read changelog files directly (similar to V1), but this may require Kernel API enhancements. + +**Pros**: Best of both worlds +**Cons**: Complex implementation, may still require Kernel API changes + +### Option 4: Skip Protocol Validation (Current Workaround) +Use the `validateProtocolAt()` approach but make it optional via configuration. + +**Pros**: Can be controlled via config +**Cons**: Reduces safety, doesn't fully solve the problem (snapshot creation still required) + +### Option 5: Allow `snapshot.version >= startVersion` ✅ RECOMMENDED + +**核心思路**:修改 Kernel API 的 validation,允许使用**更新版本**的 snapshot 来读取**历史版本**的数据(类似 V1)。 + +**当前限制**: +```java +// CommitRangeImpl.java line 125-127 +checkArgument( + startSnapshot.getVersion() == startVersion, + "startSnapshot must have version = startVersion"); +``` + +**建议修改**: +```java +checkArgument( + startSnapshot.getVersion() >= startVersion, + "startSnapshot version must be >= startVersion"); +``` + +**优势**: +- ✅ 解决 startingVersion 无法重建的问题 +- ✅ 保持 Kernel API 结构(仍需 snapshot 参数) +- ✅ 向后兼容(现有代码仍然工作) +- ✅ 允许 V1 风格的"用最新 P&M"策略 +- ✅ 用户可选择:精确版本(更安全)或最新版本(更实用) +- ✅ 只需修改一行代码 + +**实现示例**: +```java +// DSv2 can now fallback to latest snapshot +Snapshot startSnapshot; +try { + startSnapshot = snapshotManager.loadSnapshotAt(startVersion); +} catch (KernelException e) { + // Fallback to latest (like V1) + startSnapshot = snapshotManager.loadLatestSnapshot(); + logger.warn("Using latest snapshot P&M to read from version {}", startVersion); +} +commitRange.getActions(engine, startSnapshot, ACTION_SET); +``` + +**风险**: +- ⚠️ 假设 P&M 向前兼容(大多数情况下成立) +- ⚠️ 如果 P&M 有重大变化(如启用 column mapping),可能误读数据 + +**缓解**: +- 添加警告日志 +- 提供配置选项让用户选择严格/宽松模式 +- 智能检测已知的不兼容情况(如 column mapping 变化) + +详细分析见:`kernel-api-relaxed-snapshot-proposal.md` + +## Recommendation + +The core issue is that **Kernel API's design is fundamentally incompatible with V1's lenient behavior**. The requirement to pass a `startSnapshot` parameter is baked into the API signature. + +### This Is Not A Bug - It's A Design Decision + +After analyzing the Delta Protocol specification, it's clear that: + +1. **Kernel API is correct**: You genuinely need Protocol and Metadata to read AddFile actions correctly +2. **V1 is lenient but potentially incorrect**: V1 can produce wrong results when reading from versions without protocol validation +3. **The requirement is justified**: Features like Column Mapping, Deletion Vectors, and Row Tracking fundamentally change how AddFiles are interpreted + +### Why The Test Fails + +The test `startingVersion` likely: +1. Creates a table at version 0 with protocol X +2. Tries to start streaming from version 1 +3. DSv2 tries to load snapshot at version 1 +4. Version 1 may have unsupported protocol or cannot be reconstructed +5. DSv2 correctly returns empty (safer than returning wrong data) +6. V1 would have proceeded anyway (less safe, but more permissive) + +### Options Forward + +#### Option 1: Accept Breaking Change ✅ RECOMMENDED +Document that DSv2 has stricter safety requirements: +- Starting version must point to a readable snapshot +- Protocol must be supported by Kernel +- This is more correct than V1's behavior + +**Pros**: Safer, more correct +**Cons**: Breaking change for some users + +#### Option 2: Relax Kernel API (Major Change) +Make `startSnapshot` optional in Kernel API: +```java +CloseableIterator getActions( + Engine engine, + Optional startSnapshot, // Optional now + Set actionSet); +``` + +**Pros**: Backward compatible +**Cons**: +- Requires Kernel API changes +- Loses safety guarantees +- May produce incorrect results for Column Mapping, Deletion Vectors, etc. +- Goes against Delta Protocol specification + +#### Option 3: Best-Effort Fallback (Complex) +When snapshot fails, attempt to infer minimal protocol/metadata by: +1. Reading startVersion commit file +2. Extracting protocol/metadata actions +3. Validating they're supported + +**Pros**: More compatible +**Cons**: +- Very complex +- Still may fail for unsupported protocols +- Partial solution only + +#### Option 4: Config-Based Validation (Workaround) +Add a config to skip protocol validation: +``` +spark.delta.streaming.unsafeSkipProtocolValidation=true +``` + +**Pros**: User can opt-in to V1 behavior +**Cons**: +- Unsafe +- May produce corrupt results +- Still requires snapshot creation which may fail + +### Conclusion + +**Both designs are valid, but serve different goals:** + +1. **V1 Design: Pragmatic Forward Compatibility** + - Uses latest P&M to read all historical data + - Assumes P&M is forward compatible + - More permissive, works in more scenarios + - Risk: May misinterpret historical AddFiles if P&M changed + +2. **Kernel Design: Version-Specific Correctness** + - Uses each version's P&M to read that version's data + - Guarantees correct interpretation + - More restrictive, safer + - Limitation: Requires snapshot reconstruction at startVersion + +**Why the test fails:** + +The test likely creates a table where version 1 cannot be reconstructed (missing checkpoint, unsupported protocol, etc.). V1 doesn't care because it uses the current snapshot's P&M. Kernel fails because it needs version 1's snapshot. + +**Which is correct?** + +From `PROTOCOL.md`, the protocol specification doesn't mandate either approach. However: +- If P&M has changed significantly between startVersion and current version (e.g., column mapping was added), V1's approach may produce incorrect results +- If P&M hasn't changed (or is backward compatible), V1's approach is more practical + +**The real issue**: Kernel API's requirement for `startSnapshot` makes it **impossible** to implement V1's "use latest P&M" approach, even if that's sometimes the right choice. + +### Recommendation + +We should **accept this as a known architectural difference**, not a bug: + +1. **Document the difference**: DSv2 requires startVersion to have a reconstructable snapshot +2. **Explain the tradeoff**: V1 is more permissive but may misinterpret data; DSv2 is safer but more restrictive +3. **Consider API enhancement**: Add optional config to use "latest P&M" mode in Kernel API for users who understand the tradeoffs + +## Real-World Scenario: Why This Matters + +### Example 1: V1's "Latest P&M" vs Kernel's "Version-Specific P&M" + +**Setup:** +``` +Version 0: Initial table + Protocol: Reader=1, Writer=2 + Schema: {name: "id", type: "int"} + AddFile: file0.parquet with stats: {"id": {min: 0, max: 99}} + +Version 10: Column Mapping enabled + Protocol: Reader=2, Writer=5, columnMapping + Metadata: Column "id" -> physical name "col-abc123" + AddFile: file10.parquet with stats: {"col-abc123": {min: 100, max: 199}} + +Version 100: Current version + Same protocol/metadata as v10 + AddFile: file100.parquet with stats: {"col-abc123": {min: 1000, max: 1099}} +``` + +**Stream starts at version 0, current version is 100:** + +V1 Approach: +```scala +// DeltaSource initialized at version 100 +snapshotAtSourceInit = loadSnapshot(100) // Reader=2, columnMapping enabled + +// Read version 0's AddFile +// Uses version 100's P&M: expects physical names +// Version 0's stats: {"id": {min: 0, max: 99}} +// V1 sees "id" as column name (no physical name mapping) +// PROBLEM: May fail or misinterpret because current P&M expects "col-abc123" +``` + +Kernel Approach: +```java +// Stream starts at version 0 +startSnapshot = loadSnapshotAt(0) // Reader=1, no column mapping +protocol = startSnapshot.getProtocol() // Reader=1 +metadata = startSnapshot.getMetadata() // No column mapping + +// Read version 0's AddFile +// Uses version 0's P&M: no column mapping +// Version 0's stats: {"id": {min: 0, max: 99}} +// Kernel correctly interprets "id" as direct column name +// ✅ CORRECT +``` + +### Scenario: Table with Column Mapping Enabled Mid-Stream + +``` +Version 0: Table created + Protocol: Reader=1, Writer=2 + Schema: {name: "id", type: "int"} + +Version 1: Column Mapping enabled + Protocol: Reader=2, Writer=5, features=[columnMapping] + Metadata: { + "schema": { + "name": "id", + "metadata": { + "delta.columnMapping.id": 1, + "delta.columnMapping.physicalName": "col-abc123" + } + }, + "configuration": {"delta.columnMapping.mode": "name"} + } + AddFile: { + "path": "file1.parquet", + "partitionValues": {}, + "stats": { + "numRecords": 100, + "minValues": {"col-abc123": 1}, // Uses physical name! + "maxValues": {"col-abc123": 100} + } + } + +Version 2: More data added + AddFile: { + "path": "file2.parquet", + "stats": { + "numRecords": 50, + "minValues": {"col-abc123": 101}, // Still physical name + "maxValues": {"col-abc123": 150} + } + } +``` + +### What Happens With startingVersion=1? + +**V1 Behavior (Permissive)**: +```scala +// V1 reads version 1's changelog directly +// Sees AddFile with stats: {"col-abc123": {min: 1, max: 100}} +// Doesn't check protocol, assumes "col-abc123" is the column name +// Query: SELECT * FROM table WHERE id > 50 +// Result: May skip file incorrectly or fail to read parquet +``` + +**Kernel/DSv2 Behavior (Correct)**: +```java +// Tries to load snapshot at version 1 +Snapshot snapshot1 = loadSnapshotAt(1); +Protocol protocol = snapshot1.getProtocol(); // Reader=2, columnMapping +Metadata metadata = snapshot1.getMetadata(); // Has physicalName mapping + +// Reads AddFile with stats: {"col-abc123": {min: 1, max: 100}} +// Uses metadata to understand: "col-abc123" maps to "id" +// Query: SELECT * FROM table WHERE id > 50 +// Result: Correctly reads file and applies filter +``` + +**If Version 1's Protocol Is Unsupported**: +```java +// DSv2: Fails early, returns empty +// Better than V1's "try anyway and maybe corrupt data" +``` + +### The Key Insight + +From `PROTOCOL.md` line 927: +> "Track partition values and column level statistics with the physical name +> of the column in the transaction log." + +**Without metadata, you cannot parse AddFile.stats correctly!** + +The stats use physical names, but queries use logical names. You MUST have the metadata to translate between them. + +## References + +- Kernel API: `/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRange.java` lines 96-107 +- DSv2 Implementation: `/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java` lines 604-615 +- V1 Implementation: `/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala` lines 779-809 +- Test: `/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala` lines 1585-1643 +- Delta Protocol: `/PROTOCOL.md` + - Add File schema: lines 505-529 + - Column Mapping: lines 885-937 + - Deletion Vectors: lines 939-1010 + - Reader Requirements: lines 1973-1984 + diff --git a/kernel-api-relaxed-snapshot-proposal.md b/kernel-api-relaxed-snapshot-proposal.md new file mode 100644 index 00000000000..c8f985b0603 --- /dev/null +++ b/kernel-api-relaxed-snapshot-proposal.md @@ -0,0 +1,364 @@ +# Kernel API 提案:允许使用更新版本的 Snapshot 读取历史数据 + +## 问题背景 + +当前 Kernel API 强制要求 `startSnapshot.version == startVersion`: + +```java +// CommitRangeImpl.java line 125-127 +checkArgument( + startSnapshot.getVersion() == startVersion, + "startSnapshot must have version = startVersion"); +``` + +这导致: +1. 必须能够重建 startVersion 的 snapshot +2. 如果 startVersion 无法重建(missing checkpoint, unsupported protocol),则无法读取 +3. 无法实现 V1 的"用最新 P&M 读历史数据"策略 + +## 提案:允许 `snapshot.version >= startVersion` + +### 核心修改 + +```java +// 修改前 (CommitRangeImpl.java line 125-127) +checkArgument( + startSnapshot.getVersion() == startVersion, + "startSnapshot must have version = startVersion"); + +// 修改后 +checkArgument( + startSnapshot.getVersion() >= startVersion, + "startSnapshot version must be >= startVersion"); +``` + +### 语义变化 + +**修改前**: +- 必须用 startVersion 的 P&M 来读取 startVersion 的数据 +- 保证每个版本用其自己的 P&M 解析 + +**修改后**: +- 可以用 **任何 >= startVersion** 的 P&M 来读取 startVersion 及之后的数据 +- 允许用最新版本的 P&M 读取历史数据(类似 V1) +- 仍然保留用历史版本 P&M 的选项(如果可用) + +## 使用场景 + +### 场景 1: startVersion 无法重建(当前问题) + +```java +// 当前行为:失败 +try { + Snapshot snapshot1 = loadSnapshotAt(1); // 失败!protocol unsupported + commitRange.getActions(engine, snapshot1, actionSet); +} catch (Exception e) { + // 无法读取任何数据 +} + +// 提案后:使用最新 snapshot +Snapshot latestSnapshot = loadLatestSnapshot(); // 成功 +commitRange.getActions(engine, latestSnapshot, actionSet); +// 用最新 P&M 读取 version 1 的数据 +``` + +### 场景 2: 性能优化 + +不需要为每个 startVersion 重建 snapshot,可以复用缓存的 latest snapshot。 + +```java +// 当前:每次都要重建 snapshot +Snapshot snap1 = loadSnapshotAt(1); +Snapshot snap2 = loadSnapshotAt(2); +Snapshot snap3 = loadSnapshotAt(3); + +// 提案后:可以复用 +Snapshot latest = loadLatestSnapshot(); // 只加载一次 +commitRange1.getActions(engine, latest, actionSet); +commitRange2.getActions(engine, latest, actionSet); +commitRange3.getActions(engine, latest, actionSet); +``` + +### 场景 3: 灵活选择策略 + +```java +// 策略 1: 精确模式(更安全) +if (canReconstructSnapshot(startVersion)) { + Snapshot exactSnapshot = loadSnapshotAt(startVersion); + commitRange.getActions(engine, exactSnapshot, actionSet); +} + +// 策略 2: 最新模式(更实用) +else { + Snapshot latestSnapshot = loadLatestSnapshot(); + commitRange.getActions(engine, latestSnapshot, actionSet); + // 假设 P&M 向前兼容 +} +``` + +## 优势 + +### 1. 解决 startingVersion 测试失败 + +DSv2 可以实现类似 V1 的行为: + +```java +// SparkMicroBatchStream.java +private CloseableIterator filterDeltaLogs( + long startVersion, Optional endOffset) { + + CommitRange commitRange = snapshotManager.getTableChanges(...); + + // 尝试加载 startVersion 的 snapshot + Snapshot startSnapshot; + try { + startSnapshot = snapshotManager.loadSnapshotAt(startVersion); + } catch (KernelException e) { + // 回退到最新 snapshot(类似 V1) + startSnapshot = snapshotManager.loadLatestSnapshot(); + logWarning("Using latest snapshot P&M to read from version " + startVersion); + } + + // 现在可以工作了! + commitRange.getActions(engine, startSnapshot, ACTION_SET); +} +``` + +### 2. 保持 API 结构 + +不需要: +- 改变 API 签名(仍然需要 snapshot 参数) +- 添加复杂的 Optional 逻辑 +- 破坏现有代码 + +只需要: +- 放松一个 validation 检查 +- 允许更灵活的使用方式 + +### 3. 向后兼容 + +所有现有代码仍然工作: +```java +// 现有代码(exact version)仍然有效 +Snapshot snapshot = loadSnapshotAt(startVersion); +commitRange.getActions(engine, snapshot, actionSet); +// ✅ snapshot.version == startVersion,检查通过 +``` + +新代码可以使用更灵活的方式: +```java +// 新代码(latest version) +Snapshot latest = loadLatestSnapshot(); +commitRange.getActions(engine, latest, actionSet); +// ✅ latest.version >= startVersion,检查也通过 +``` + +### 4. 用户可选择安全级别 + +```java +// 配置选项 +if (spark.conf.get("spark.delta.streaming.useExactVersionSnapshot")) { + // 严格模式:必须用精确版本 + snapshot = loadSnapshotAt(startVersion); +} else { + // 宽松模式:允许用最新版本 + snapshot = loadLatestSnapshot(); +} +``` + +## 风险和限制 + +### 风险 1: P&M 不向前兼容 + +**问题**:新版本的 P&M 可能无法正确读取老版本的 AddFile + +**例子**: +``` +Version 1: No column mapping + AddFile.stats = {"id": {min: 0, max: 99}} + +Version 10: Column mapping enabled + Metadata expects physical names "col-abc123" + +// 用 version 10 的 P&M 读取 version 1 的 AddFile +// 可能误解 "id" 作为物理名而不是逻辑名 +``` + +**缓解**: +1. 文档明确说明风险 +2. 添加警告日志 +3. 提供配置选项让用户选择 +4. 在已知不兼容的情况下(如 column mapping 启用前后)抛出错误 + +### 风险 2: 协议版本不匹配 + +**问题**:更新的 snapshot 可能有更新的 protocol 要求 + +**例子**: +``` +Version 1: Reader=1, Writer=2 +Version 10: Reader=3, Writer=7, features=[deletionVectors] + +// 用 version 10 的 protocol 读取 version 1 的数据 +// Version 1 的 AddFile 没有 deletionVector 字段 +// 应该没问题,新 protocol 应该能读旧格式 +``` + +**缓解**: +- 协议设计上应该保证向前兼容 +- 如果不兼容,在 AddFile 解析时会失败(fail fast) + +### 风险 3: 语义混淆 + +**问题**:用户可能不理解为什么同一个 AddFile 用不同的 snapshot 读取结果不同 + +**缓解**: +- 详细文档说明行为 +- 明确的日志和警告 +- 提供配置选项控制行为 + +## 实现建议 + +### Phase 1: 最小修改(只放松检查) + +```java +// CommitRangeImpl.java +private void validateParameters( + Engine engine, Snapshot startSnapshot, Set actionSet) { + requireNonNull(engine, "engine cannot be null"); + requireNonNull(startSnapshot, "startSnapshot cannot be null"); + requireNonNull(actionSet, "actionSet cannot be null"); + + // 修改这里 + checkArgument( + startSnapshot.getVersion() >= startVersion, + "startSnapshot version must be >= startVersion (got %s, expected >= %s)", + startSnapshot.getVersion(), startVersion); +} +``` + +### Phase 2: 添加警告 + +```java +private void validateParameters(...) { + // ... 基本检查 ... + + if (startSnapshot.getVersion() > startVersion) { + logger.warn( + "Using snapshot at version {} to read commits starting from version {}. " + + "This assumes Protocol/Metadata are forward compatible. " + + "If you encounter issues, try using exactVersionSnapshot=true", + startSnapshot.getVersion(), startVersion); + } +} +``` + +### Phase 3: 添加配置选项(DSv2 层) + +```java +// SparkMicroBatchStream.java +private CloseableIterator filterDeltaLogs(...) { + boolean useExactVersion = sqlConf.getConf( + DeltaSQLConf.DELTA_STREAMING_DSV2_USE_EXACT_VERSION_SNAPSHOT); + + Snapshot startSnapshot; + if (useExactVersion) { + // 严格模式:必须精确版本 + startSnapshot = snapshotManager.loadSnapshotAt(startVersion); + } else { + // 宽松模式:尝试精确版本,失败则用最新 + try { + startSnapshot = snapshotManager.loadSnapshotAt(startVersion); + } catch (KernelException e) { + logger.warn("Cannot load snapshot at version {}, using latest", startVersion); + startSnapshot = snapshotManager.loadLatestSnapshot(); + } + } + + commitRange.getActions(engine, startSnapshot, ACTION_SET); +} +``` + +### Phase 4: 智能检测不兼容情况 + +```java +private void validateProtocolCompatibility( + Snapshot snapshot, long startVersion, long endVersion) { + + // 检查是否有重大 P&M 变化 + if (hasColumnMappingChanged(snapshot, startVersion)) { + throw new DeltaException( + "Column mapping mode changed between version " + startVersion + + " and " + snapshot.getVersion() + ". " + + "Cannot safely use newer snapshot to read older data. " + + "Please use exactVersionSnapshot=true"); + } + + // 其他不兼容检查... +} +``` + +## 完整的解决方案对比 + +### 方案 A: 当前状态(严格要求 exact version) +- ✅ 保证正确性 +- ❌ 无法处理无法重建的 snapshot +- ❌ startingVersion 测试失败 + +### 方案 B: 完全移除 startSnapshot 参数 +- ✅ 最灵活 +- ❌ 破坏 API +- ❌ 失去协议验证能力 + +### 方案 C: 本提案(允许 >= startVersion) +- ✅ 保持 API 结构 +- ✅ 解决无法重建的问题 +- ✅ 向后兼容 +- ✅ 用户可选择策略 +- ⚠️ 需要文档说明风险 +- ⚠️ 假设 P&M 向前兼容 + +### 方案 D: 方案 C + 智能检测 +- ✅ 方案 C 的所有优点 +- ✅ 自动检测不兼容情况 +- ✅ 在安全时允许,不安全时拒绝 +- ⚠️ 实现更复杂 + +## 推荐 + +**推荐采用方案 D(分阶段实施):** + +1. **Phase 1(立即)**: 修改 validation 检查,允许 `>= startVersion` +2. **Phase 2(短期)**: 添加警告日志,提醒用户风险 +3. **Phase 3(中期)**: 在 DSv2 添加配置选项,默认使用宽松模式 +4. **Phase 4(长期)**: 添加智能检测,自动识别不兼容情况 + +这样可以: +- 立即解决 startingVersion 测试失败问题 +- 保持足够的灵活性 +- 随时间推移增加安全检查 +- 让用户选择最适合的策略 + +## 文档要求 + +必须明确文档说明: + +1. **默认行为**:Kernel 允许使用更新版本的 snapshot +2. **假设**:假设 Protocol/Metadata 是向前兼容的 +3. **风险**:如果 P&M 发生不兼容变化(如启用 column mapping),可能产生错误结果 +4. **配置**:提供 `exactVersionSnapshot` 选项强制使用精确版本 +5. **最佳实践**:建议在生产环境使用精确版本模式 + +## 总结 + +**你的建议非常有价值!** + +通过允许 `snapshot.version >= startVersion`,我们可以: +- ✅ 解决 DSv2 的 startingVersion 测试失败 +- ✅ 实现类似 V1 的灵活性 +- ✅ 保持 Kernel API 的结构 +- ✅ 保留精确版本的选项(向后兼容) +- ✅ 让用户根据场景选择策略 + +这是一个**简单但强大**的修改,只需要改一行代码,但能解决根本性的架构限制。 + diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java index 014fda50bc6..6a7261e42ab 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java @@ -60,10 +60,13 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final DeltaSnapshotManager snapshotManager; private final DeltaOptions options; private final SparkSession spark; + private final Snapshot initialSnapshot; - public SparkMicroBatchStream(DeltaSnapshotManager snapshotManager, Configuration hadoopConf) { + public SparkMicroBatchStream( + DeltaSnapshotManager snapshotManager, Snapshot initialSnapshot, Configuration hadoopConf) { this( snapshotManager, + initialSnapshot, hadoopConf, SparkSession.active(), new DeltaOptions( @@ -73,11 +76,13 @@ public SparkMicroBatchStream(DeltaSnapshotManager snapshotManager, Configuration public SparkMicroBatchStream( DeltaSnapshotManager snapshotManager, + Snapshot initialSnapshot, Configuration hadoopConf, SparkSession spark, DeltaOptions options) { this.spark = spark; this.snapshotManager = snapshotManager; + this.initialSnapshot = initialSnapshot; this.engine = DefaultEngine.create(hadoopConf); this.options = options; } @@ -294,11 +299,12 @@ private CloseableIterator filterDeltaLogs( Optional endVersionOpt = endOffset.isDefined() ? Optional.of(endOffset.get().reservoirVersion()) : Optional.empty(); CommitRange commitRange = snapshotManager.getTableChanges(engine, startVersion, endVersionOpt); - // Required by kernel: perform protocol validation by creating a snapshot at startVersion. - Snapshot startSnapshot = snapshotManager.loadSnapshotAt(startVersion); - String tablePath = startSnapshot.getPath(); try (CloseableIterator actionsIter = - commitRange.getActions(engine, startSnapshot, ACTION_SET)) { + StreamingHelper.getActionsFromRangeUnsafe( + engine, + (io.delta.kernel.internal.commitrange.CommitRangeImpl) commitRange, + initialSnapshot.getPath().toString(), + ACTION_SET)) { // Each ColumnarBatch belongs to a single commit version, // but a single version may span multiple ColumnarBatches. long currentVersion = -1; @@ -324,7 +330,7 @@ private CloseableIterator filterDeltaLogs( // TODO(#5318): migrate to kernel's commit-level iterator (WIP). // The current one-pass algorithm assumes REMOVE actions proceed ADD actions // in a commit; we should implement a proper two-pass approach once kernel API is ready. - validateCommit(batch, version, tablePath, endOffset); + validateCommit(batch, version, initialSnapshot.getPath().toString(), endOffset); currentVersion = version; currentIndex = diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java index 91db4ca8080..dfb6fa6f3b6 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java @@ -55,6 +55,7 @@ public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntimeV2Filtering { private final DeltaSnapshotManager snapshotManager; + private final io.delta.kernel.Snapshot initialSnapshot; private final StructType readDataSchema; private final StructType dataSchema; private final StructType partitionSchema; @@ -74,6 +75,7 @@ public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntim public SparkScan( DeltaSnapshotManager snapshotManager, + io.delta.kernel.Snapshot initialSnapshot, StructType dataSchema, StructType partitionSchema, StructType readDataSchema, @@ -83,6 +85,7 @@ public SparkScan( CaseInsensitiveStringMap options) { this.snapshotManager = Objects.requireNonNull(snapshotManager, "snapshotManager is null"); + this.initialSnapshot = Objects.requireNonNull(initialSnapshot, "initialSnapshot is null"); this.dataSchema = Objects.requireNonNull(dataSchema, "dataSchema is null"); this.partitionSchema = Objects.requireNonNull(partitionSchema, "partitionSchema is null"); this.readDataSchema = Objects.requireNonNull(readDataSchema, "readDataSchema is null"); @@ -130,7 +133,7 @@ public Batch toBatch() { public MicroBatchStream toMicroBatchStream(String checkpointLocation) { DeltaOptions deltaOptions = new DeltaOptions(scalaOptions, sqlConf); return new SparkMicroBatchStream( - snapshotManager, hadoopConf, SparkSession.active(), deltaOptions); + snapshotManager, initialSnapshot, hadoopConf, SparkSession.active(), deltaOptions); } @Override diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java index e77e4013573..3a06d13726b 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java @@ -39,6 +39,7 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownFilters { private io.delta.kernel.ScanBuilder kernelScanBuilder; + private final io.delta.kernel.Snapshot initialSnapshot; private final DeltaSnapshotManager snapshotManager; private final StructType dataSchema; private final StructType partitionSchema; @@ -70,8 +71,8 @@ public SparkScanBuilder( StructType dataSchema, StructType partitionSchema, CaseInsensitiveStringMap options) { - this.kernelScanBuilder = - requireNonNull(initialSnapshot, "initialSnapshot is null").getScanBuilder(); + this.initialSnapshot = requireNonNull(initialSnapshot, "initialSnapshot is null"); + this.kernelScanBuilder = initialSnapshot.getScanBuilder(); this.snapshotManager = requireNonNull(snapshotManager, "snapshotManager is null"); this.dataSchema = requireNonNull(dataSchema, "dataSchema is null"); this.partitionSchema = requireNonNull(partitionSchema, "partitionSchema is null"); @@ -158,6 +159,7 @@ public Filter[] pushedFilters() { public org.apache.spark.sql.connector.read.Scan build() { return new SparkScan( snapshotManager, + initialSnapshot, dataSchema, partitionSchema, requiredDataSchema, diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java index 2864e66d974..97482efb76c 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java @@ -21,10 +21,16 @@ import io.delta.kernel.data.ColumnVector; import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.DeltaLogActionUtils; +import io.delta.kernel.internal.TableChangesUtils; import io.delta.kernel.internal.actions.AddFile; import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.commitrange.CommitRangeImpl; import io.delta.kernel.internal.data.StructRow; +import io.delta.kernel.utils.CloseableIterator; import java.util.Optional; +import java.util.Set; import org.apache.spark.annotation.Experimental; /** @@ -91,6 +97,17 @@ public static Optional getDataChangeRemove(ColumnarBatch batch, int return removeFile.getDataChange() ? Optional.of(removeFile) : Optional.empty(); } + public static CloseableIterator getActionsFromRangeUnsafe( + Engine engine, + CommitRangeImpl commitRange, + String tablePath, + Set actionSet) { + return TableChangesUtils.flattenCommitsAndAddMetadata( + engine, + DeltaLogActionUtils.getActionsFromCommitFilesWithProtocolValidation( + engine, tablePath, commitRange.getDeltaFiles(), actionSet)); + } + /** Private constructor to prevent instantiation of this utility class. */ private StreamingHelper() {} } From da328adf5ad4203d6f2e125e96f160c19213cfdc Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 24 Nov 2025 21:06:10 -0800 Subject: [PATCH 2/9] save --- dsv2-startingversion-issue.md | 629 ------------------------ kernel-api-relaxed-snapshot-proposal.md | 364 -------------- 2 files changed, 993 deletions(-) delete mode 100644 dsv2-startingversion-issue.md delete mode 100644 kernel-api-relaxed-snapshot-proposal.md diff --git a/dsv2-startingversion-issue.md b/dsv2-startingversion-issue.md deleted file mode 100644 index a9bca7c62bf..00000000000 --- a/dsv2-startingversion-issue.md +++ /dev/null @@ -1,629 +0,0 @@ -# DSv2 StartingVersion Issue Analysis - -## Executive Summary - -**TL;DR**: The `startingVersion` test failure is NOT a bug. It's a **fundamental design difference** between V1 and Kernel API: - -- **V1 API**: Can read changelog files without validating protocol → fast but potentially incorrect -- **Kernel API**: Requires protocol/metadata to read AddFile correctly → safe but more restrictive -- **Why it matters**: Features like Column Mapping change how AddFile.stats and partitionValues are interpreted -- **Recommendation**: Accept as breaking change; DSv2 is more correct per Delta Protocol specification - -This document explains the technical justification from `PROTOCOL.md` for why Kernel API requires a `startSnapshot` parameter. - ---- - -## Problem Summary - -The `startingVersion` test in `DeltaSourceDSv2Suite` is failing because: -- **Expected**: Read 10 rows (values 10-19) when starting from version 1 -- **Actual**: Read 0 rows (empty result) - -## Root Cause - -There's a fundamental API design difference between Delta v1 and Kernel API regarding starting versions: - -### V1 API Behavior (DeltaSource.scala) -```scala -protected def getFileChanges(fromVersion: Long, ...): ClosableIterator[IndexedFile] = { - def filterAndIndexDeltaLogs(startVersion: Long): ClosableIterator[IndexedFile] = { - deltaLog.getChangeLogFiles(startVersion, catalogTableOpt, options.failOnDataLoss).flatMapWithClose { - case (version, filestatus) => - // Directly read changelog files without requiring snapshot at startVersion - ... - } - } -} -``` - -**Key point**: V1 API can start reading from any version by directly reading changelog files. It does NOT require the snapshot at `startVersion` to be readable/reconstructable. - -### Kernel API Behavior (CommitRange.java) -```java -/** - * @param startSnapshot the snapshot for startVersion, required to ensure the table is readable by - * Kernel at startVersion - * @throws IllegalArgumentException if startSnapshot.getVersion() != startVersion - * @throws KernelException if the version range contains a version with reader protocol that is - * unsupported by Kernel - */ -CloseableIterator getActions( - Engine engine, - Snapshot startSnapshot, // <-- REQUIRED! - Set actionSet); -``` - -**Key point**: Kernel API REQUIRES a valid snapshot at `startVersion`. This enforces that the table must be readable by Kernel at that version. - -### DSv2 Implementation (SparkMicroBatchStream.java) - -```java -private CloseableIterator filterDeltaLogs( - long startVersion, Optional endOffset) { - - CommitRange commitRange = snapshotManager.getTableChanges(engine, startVersion, endVersionOpt); - - // Required by kernel: perform protocol validation by creating a snapshot at startVersion. - Snapshot startSnapshot; - try { - startSnapshot = snapshotManager.loadSnapshotAt(startVersion); // Line 607 - } catch (io.delta.kernel.exceptions.KernelException e) { - // If startVersion doesn't exist (e.g., starting from "latest" when the next version - // hasn't been written yet), return empty iterator. - return Utils.toCloseableIterator(allIndexedFiles.iterator()); // <-- RETURNS EMPTY! - } - - try (CloseableIterator actionsIter = - commitRange.getActions(engine, startSnapshot, ACTION_SET)) { // Line 615 - // Process actions... - } -} -``` - -**The problem**: When `loadSnapshotAt(startVersion)` fails (e.g., due to unsupported protocol), the code catches the exception and returns an empty iterator, resulting in 0 rows read. - -## Why This Difference Matters - -### Scenario 1: Old Protocol Version -If `startVersion=1` uses a protocol that Kernel doesn't support: -- **V1**: Can still read from version 1 onward by reading changelog files directly -- **Kernel/DSv2**: Cannot create snapshot at version 1, so returns empty results - -### Scenario 2: Non-reconstructable Snapshot -If the snapshot at `startVersion` cannot be reconstructed (missing checkpoint, etc.): -- **V1**: Can still read changelog files -- **Kernel/DSv2**: Cannot create snapshot, returns empty results - -### Scenario 3: Comment from SparkMicroBatchStream -```java -// When starting from a given version, we don't require that the snapshot of this -// version can be reconstructed, even though the input table is technically in an -// inconsistent state. If the snapshot cannot be reconstructed, then the protocol -// check is skipped, so this is technically not safe, but we keep it this way for -// historical reasons. -``` - -This comment in `validateProtocolAt()` (line 438-443) acknowledges that V1 has this behavior "for historical reasons" but DSv2 cannot implement the same behavior due to Kernel API constraints. - -## Why Kernel API Requires startSnapshot - -Looking at the Kernel API design and Delta Protocol specification, the `startSnapshot` parameter is required because **you cannot correctly read AddFile actions without knowing the Protocol and Metadata**. - -### Protocol-Dependent Features That Affect AddFile Reading - -From `PROTOCOL.md`, there are multiple table features that fundamentally change how AddFiles must be interpreted: - -#### 1. **Column Mapping** (Reader Version 2+) -``` -Reader Requirements (PROTOCOL.md line 930-937): -- In 'id' mode: Must resolve columns by field_id in parquet metadata -- In 'name' mode: Must resolve columns by physicalName -- Partition values and statistics must be resolved by physical names -``` - -**Impact**: Without knowing if Column Mapping is enabled and which mode, you cannot: -- Read partition values correctly from AddFile.partitionValues -- Parse column statistics correctly from AddFile.stats -- Read the parquet files themselves - -#### 2. **Deletion Vectors** (Reader Version 3+, Writer Version 7+) -``` -AddFile Schema (PROTOCOL.md line 516): -- deletionVector: [DeletionVectorDescriptor Struct] (optional) - -Reader Requirements (PROTOCOL.md line 1006-1007): -- If a logical file has a DV, invalidated records MUST NOT be returned -``` - -**Impact**: Without checking protocol, you don't know: -- If Deletion Vectors are supported -- How to handle AddFile.deletionVector field -- Which rows to filter when reading - -#### 3. **Row Tracking** (Writer Version 7+) -``` -AddFile Schema (PROTOCOL.md line 517-518): -- baseRowId: Long (optional) -- defaultRowCommitVersion: Long (optional) -``` - -**Impact**: These fields only exist if Row Tracking feature is active. - -#### 4. **Clustered Tables** (Writer Version 7+) -``` -AddFile Schema (PROTOCOL.md line 519): -- clusteringProvider: String (optional) -``` - -**Impact**: Without protocol, can't tell if this field is present. - -### Why V1 Could Skip This - THE KEY DIFFERENCE - -**V1 uses the CURRENT (source init) snapshot's P&M to read ALL historical AddFiles:** - -```scala -// From DeltaSource.scala line 728-734 -case class DeltaSource( - spark: SparkSession, - deltaLog: DeltaLog, - catalogTableOpt: Option[CatalogTable], - options: DeltaOptions, - snapshotAtSourceInit: SnapshotDescriptor, // <-- FIXED at source init! - metadataPath: String, - ... -) - -// Line 199-228: readSnapshotDescriptor is based on snapshotAtSourceInit -protected lazy val readSnapshotDescriptor: SnapshotDescriptor = - persistedMetadataAtSourceInit.map { ... } - .getOrElse(snapshotAtSourceInit) // Use snapshot from source init - -// Line 390-407: ALL AddFiles are read using this SAME descriptor -deltaLog.createDataFrame( - readSnapshotDescriptor, // <-- Same P&M for all versions! - addFiles.map(_.getFileAction.asInstanceOf[AddFile]), - isStreaming = true) -``` - -**Key insight**: V1 gets ONE snapshot when the stream starts (usually the latest), then uses that snapshot's Protocol and Metadata to interpret AddFiles from ALL versions, including historical ones. - -This works because V1 assumes: -1. **Forward compatibility**: Current P&M can read historical AddFiles -2. **Single interpretation**: Use one consistent view of schema/features -3. **Lazy validation**: Errors surface at DataFrame creation, not log reading -4. **No per-version snapshot needed**: Only need to read changelog files, not reconstruct snapshots -5. **startVersion doesn't need to be readable**: As long as changelog exists, can read it - -### Why Kernel API Is Stricter - -**Kernel API requires the startVersion's P&M to read that version's AddFiles:** - -```java -// From CommitRange.java line 96-97: -@param startSnapshot the snapshot for startVersion, required to ensure the table is readable by - Kernel at startVersion - -// From SparkMicroBatchStream.java line 604-615 -CommitRange commitRange = snapshotManager.getTableChanges(engine, startVersion, endVersionOpt); - -// Must load snapshot AT startVersion (not latest!) -Snapshot startSnapshot = snapshotManager.loadSnapshotAt(startVersion); - -// Use startVersion's P&M to read its AddFiles -CloseableIterator actionsIter = - commitRange.getActions(engine, startSnapshot, ACTION_SET); -``` - -The startSnapshot provides: -1. **Protocol**: Which features are active at THAT version (not current) -2. **Metadata**: Schema with column mapping info at THAT version -3. **Validation**: Ensures startVersion is actually readable by Kernel -4. **Per-version interpretation**: Each version's AddFiles interpreted with that version's P&M - -## The Fundamental Architectural Difference - -This is the core of the issue: - -| Aspect | V1 API | Kernel API | -|--------|--------|------------| -| **P&M Source** | Latest snapshot (at source init) | startVersion's snapshot | -| **P&M Updates** | Fixed throughout stream | Per version | -| **Assumption** | Forward compatible | Version-specific | -| **When fails** | At DataFrame read time | At log read time (earlier) | -| **Requirement** | Only changelog files exist | Snapshot must be reconstructable | -| **startingVersion** | Just needs commit file | Needs full snapshot | - -### Why This Matters - -**Scenario: Reading from version 10, current version is 100** - -V1: -```scala -// Use version 100's P&M to read version 10's AddFiles -// Works if P&M is forward compatible -// Fails if version 10 used different column mapping mode -``` - -Kernel: -```java -// Must reconstruct version 10's snapshot -// Use version 10's P&M to read version 10's AddFiles -// Fails if version 10 has unsupported protocol -// More correct but more restrictive -``` - -### Concrete Example: Column Mapping - -Without startSnapshot: -```java -// AddFile has partitionValues = {"col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49": "2024-01-01"} -// Is this the actual column name, or a physical name? -// Need metadata to know if Column Mapping is active! - -// AddFile.stats contains: -// {"col-5f422f40-de70-45b2-88ab-1d5c90e94db1": {"min": 1, "max": 100}} -// What column does this refer to? -// Need metadata to map physical name -> logical name! -``` - -With startSnapshot: -```java -Snapshot snapshot = loadSnapshotAt(version); -Protocol protocol = snapshot.getProtocol(); -Metadata metadata = snapshot.getMetadata(); - -if (protocol.supportsColumnMapping()) { - // Use metadata.schema to map physical names -> logical names - String logicalName = metadata.getLogicalName("col-a7f4159c-..."); -} -``` - -### From PROTOCOL.md Line 1973-1984 - -``` -# Requirements for Readers - -Reader Version 2: Respect Column Mapping -Reader Version 3: Respect Table Features for readers -``` - -**You cannot read a Delta table correctly without knowing its Protocol and Metadata.** - -This is a more restrictive but safer API design compared to V1. - -## Impact - -This design difference means: - -1. **Test Failures**: Tests like `startingVersion` that rely on V1's lenient behavior will fail in DSv2 -2. **Behavior Change**: Users who relied on starting from non-readable versions will see different behavior -3. **API Limitation**: Kernel API fundamentally cannot support the same use cases as V1 API - -## Possible Solutions - -### Option 1: Relax Kernel API Requirements -Modify `CommitRange.getActions()` to not require `startSnapshot`, or make it optional. This would require changes to the Kernel API itself. - -**Pros**: Most compatible with V1 behavior -**Cons**: Requires Kernel API changes, may reduce safety guarantees - -### Option 2: Document as Breaking Change -Document that DSv2 has stricter requirements for `startingVersion` and this is a known behavior change. - -**Pros**: No code changes needed -**Cons**: Breaking change for users - -### Option 3: Fallback Mechanism -When snapshot creation fails, attempt to read changelog files directly (similar to V1), but this may require Kernel API enhancements. - -**Pros**: Best of both worlds -**Cons**: Complex implementation, may still require Kernel API changes - -### Option 4: Skip Protocol Validation (Current Workaround) -Use the `validateProtocolAt()` approach but make it optional via configuration. - -**Pros**: Can be controlled via config -**Cons**: Reduces safety, doesn't fully solve the problem (snapshot creation still required) - -### Option 5: Allow `snapshot.version >= startVersion` ✅ RECOMMENDED - -**核心思路**:修改 Kernel API 的 validation,允许使用**更新版本**的 snapshot 来读取**历史版本**的数据(类似 V1)。 - -**当前限制**: -```java -// CommitRangeImpl.java line 125-127 -checkArgument( - startSnapshot.getVersion() == startVersion, - "startSnapshot must have version = startVersion"); -``` - -**建议修改**: -```java -checkArgument( - startSnapshot.getVersion() >= startVersion, - "startSnapshot version must be >= startVersion"); -``` - -**优势**: -- ✅ 解决 startingVersion 无法重建的问题 -- ✅ 保持 Kernel API 结构(仍需 snapshot 参数) -- ✅ 向后兼容(现有代码仍然工作) -- ✅ 允许 V1 风格的"用最新 P&M"策略 -- ✅ 用户可选择:精确版本(更安全)或最新版本(更实用) -- ✅ 只需修改一行代码 - -**实现示例**: -```java -// DSv2 can now fallback to latest snapshot -Snapshot startSnapshot; -try { - startSnapshot = snapshotManager.loadSnapshotAt(startVersion); -} catch (KernelException e) { - // Fallback to latest (like V1) - startSnapshot = snapshotManager.loadLatestSnapshot(); - logger.warn("Using latest snapshot P&M to read from version {}", startVersion); -} -commitRange.getActions(engine, startSnapshot, ACTION_SET); -``` - -**风险**: -- ⚠️ 假设 P&M 向前兼容(大多数情况下成立) -- ⚠️ 如果 P&M 有重大变化(如启用 column mapping),可能误读数据 - -**缓解**: -- 添加警告日志 -- 提供配置选项让用户选择严格/宽松模式 -- 智能检测已知的不兼容情况(如 column mapping 变化) - -详细分析见:`kernel-api-relaxed-snapshot-proposal.md` - -## Recommendation - -The core issue is that **Kernel API's design is fundamentally incompatible with V1's lenient behavior**. The requirement to pass a `startSnapshot` parameter is baked into the API signature. - -### This Is Not A Bug - It's A Design Decision - -After analyzing the Delta Protocol specification, it's clear that: - -1. **Kernel API is correct**: You genuinely need Protocol and Metadata to read AddFile actions correctly -2. **V1 is lenient but potentially incorrect**: V1 can produce wrong results when reading from versions without protocol validation -3. **The requirement is justified**: Features like Column Mapping, Deletion Vectors, and Row Tracking fundamentally change how AddFiles are interpreted - -### Why The Test Fails - -The test `startingVersion` likely: -1. Creates a table at version 0 with protocol X -2. Tries to start streaming from version 1 -3. DSv2 tries to load snapshot at version 1 -4. Version 1 may have unsupported protocol or cannot be reconstructed -5. DSv2 correctly returns empty (safer than returning wrong data) -6. V1 would have proceeded anyway (less safe, but more permissive) - -### Options Forward - -#### Option 1: Accept Breaking Change ✅ RECOMMENDED -Document that DSv2 has stricter safety requirements: -- Starting version must point to a readable snapshot -- Protocol must be supported by Kernel -- This is more correct than V1's behavior - -**Pros**: Safer, more correct -**Cons**: Breaking change for some users - -#### Option 2: Relax Kernel API (Major Change) -Make `startSnapshot` optional in Kernel API: -```java -CloseableIterator getActions( - Engine engine, - Optional startSnapshot, // Optional now - Set actionSet); -``` - -**Pros**: Backward compatible -**Cons**: -- Requires Kernel API changes -- Loses safety guarantees -- May produce incorrect results for Column Mapping, Deletion Vectors, etc. -- Goes against Delta Protocol specification - -#### Option 3: Best-Effort Fallback (Complex) -When snapshot fails, attempt to infer minimal protocol/metadata by: -1. Reading startVersion commit file -2. Extracting protocol/metadata actions -3. Validating they're supported - -**Pros**: More compatible -**Cons**: -- Very complex -- Still may fail for unsupported protocols -- Partial solution only - -#### Option 4: Config-Based Validation (Workaround) -Add a config to skip protocol validation: -``` -spark.delta.streaming.unsafeSkipProtocolValidation=true -``` - -**Pros**: User can opt-in to V1 behavior -**Cons**: -- Unsafe -- May produce corrupt results -- Still requires snapshot creation which may fail - -### Conclusion - -**Both designs are valid, but serve different goals:** - -1. **V1 Design: Pragmatic Forward Compatibility** - - Uses latest P&M to read all historical data - - Assumes P&M is forward compatible - - More permissive, works in more scenarios - - Risk: May misinterpret historical AddFiles if P&M changed - -2. **Kernel Design: Version-Specific Correctness** - - Uses each version's P&M to read that version's data - - Guarantees correct interpretation - - More restrictive, safer - - Limitation: Requires snapshot reconstruction at startVersion - -**Why the test fails:** - -The test likely creates a table where version 1 cannot be reconstructed (missing checkpoint, unsupported protocol, etc.). V1 doesn't care because it uses the current snapshot's P&M. Kernel fails because it needs version 1's snapshot. - -**Which is correct?** - -From `PROTOCOL.md`, the protocol specification doesn't mandate either approach. However: -- If P&M has changed significantly between startVersion and current version (e.g., column mapping was added), V1's approach may produce incorrect results -- If P&M hasn't changed (or is backward compatible), V1's approach is more practical - -**The real issue**: Kernel API's requirement for `startSnapshot` makes it **impossible** to implement V1's "use latest P&M" approach, even if that's sometimes the right choice. - -### Recommendation - -We should **accept this as a known architectural difference**, not a bug: - -1. **Document the difference**: DSv2 requires startVersion to have a reconstructable snapshot -2. **Explain the tradeoff**: V1 is more permissive but may misinterpret data; DSv2 is safer but more restrictive -3. **Consider API enhancement**: Add optional config to use "latest P&M" mode in Kernel API for users who understand the tradeoffs - -## Real-World Scenario: Why This Matters - -### Example 1: V1's "Latest P&M" vs Kernel's "Version-Specific P&M" - -**Setup:** -``` -Version 0: Initial table - Protocol: Reader=1, Writer=2 - Schema: {name: "id", type: "int"} - AddFile: file0.parquet with stats: {"id": {min: 0, max: 99}} - -Version 10: Column Mapping enabled - Protocol: Reader=2, Writer=5, columnMapping - Metadata: Column "id" -> physical name "col-abc123" - AddFile: file10.parquet with stats: {"col-abc123": {min: 100, max: 199}} - -Version 100: Current version - Same protocol/metadata as v10 - AddFile: file100.parquet with stats: {"col-abc123": {min: 1000, max: 1099}} -``` - -**Stream starts at version 0, current version is 100:** - -V1 Approach: -```scala -// DeltaSource initialized at version 100 -snapshotAtSourceInit = loadSnapshot(100) // Reader=2, columnMapping enabled - -// Read version 0's AddFile -// Uses version 100's P&M: expects physical names -// Version 0's stats: {"id": {min: 0, max: 99}} -// V1 sees "id" as column name (no physical name mapping) -// PROBLEM: May fail or misinterpret because current P&M expects "col-abc123" -``` - -Kernel Approach: -```java -// Stream starts at version 0 -startSnapshot = loadSnapshotAt(0) // Reader=1, no column mapping -protocol = startSnapshot.getProtocol() // Reader=1 -metadata = startSnapshot.getMetadata() // No column mapping - -// Read version 0's AddFile -// Uses version 0's P&M: no column mapping -// Version 0's stats: {"id": {min: 0, max: 99}} -// Kernel correctly interprets "id" as direct column name -// ✅ CORRECT -``` - -### Scenario: Table with Column Mapping Enabled Mid-Stream - -``` -Version 0: Table created - Protocol: Reader=1, Writer=2 - Schema: {name: "id", type: "int"} - -Version 1: Column Mapping enabled - Protocol: Reader=2, Writer=5, features=[columnMapping] - Metadata: { - "schema": { - "name": "id", - "metadata": { - "delta.columnMapping.id": 1, - "delta.columnMapping.physicalName": "col-abc123" - } - }, - "configuration": {"delta.columnMapping.mode": "name"} - } - AddFile: { - "path": "file1.parquet", - "partitionValues": {}, - "stats": { - "numRecords": 100, - "minValues": {"col-abc123": 1}, // Uses physical name! - "maxValues": {"col-abc123": 100} - } - } - -Version 2: More data added - AddFile: { - "path": "file2.parquet", - "stats": { - "numRecords": 50, - "minValues": {"col-abc123": 101}, // Still physical name - "maxValues": {"col-abc123": 150} - } - } -``` - -### What Happens With startingVersion=1? - -**V1 Behavior (Permissive)**: -```scala -// V1 reads version 1's changelog directly -// Sees AddFile with stats: {"col-abc123": {min: 1, max: 100}} -// Doesn't check protocol, assumes "col-abc123" is the column name -// Query: SELECT * FROM table WHERE id > 50 -// Result: May skip file incorrectly or fail to read parquet -``` - -**Kernel/DSv2 Behavior (Correct)**: -```java -// Tries to load snapshot at version 1 -Snapshot snapshot1 = loadSnapshotAt(1); -Protocol protocol = snapshot1.getProtocol(); // Reader=2, columnMapping -Metadata metadata = snapshot1.getMetadata(); // Has physicalName mapping - -// Reads AddFile with stats: {"col-abc123": {min: 1, max: 100}} -// Uses metadata to understand: "col-abc123" maps to "id" -// Query: SELECT * FROM table WHERE id > 50 -// Result: Correctly reads file and applies filter -``` - -**If Version 1's Protocol Is Unsupported**: -```java -// DSv2: Fails early, returns empty -// Better than V1's "try anyway and maybe corrupt data" -``` - -### The Key Insight - -From `PROTOCOL.md` line 927: -> "Track partition values and column level statistics with the physical name -> of the column in the transaction log." - -**Without metadata, you cannot parse AddFile.stats correctly!** - -The stats use physical names, but queries use logical names. You MUST have the metadata to translate between them. - -## References - -- Kernel API: `/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRange.java` lines 96-107 -- DSv2 Implementation: `/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java` lines 604-615 -- V1 Implementation: `/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala` lines 779-809 -- Test: `/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala` lines 1585-1643 -- Delta Protocol: `/PROTOCOL.md` - - Add File schema: lines 505-529 - - Column Mapping: lines 885-937 - - Deletion Vectors: lines 939-1010 - - Reader Requirements: lines 1973-1984 - diff --git a/kernel-api-relaxed-snapshot-proposal.md b/kernel-api-relaxed-snapshot-proposal.md deleted file mode 100644 index c8f985b0603..00000000000 --- a/kernel-api-relaxed-snapshot-proposal.md +++ /dev/null @@ -1,364 +0,0 @@ -# Kernel API 提案:允许使用更新版本的 Snapshot 读取历史数据 - -## 问题背景 - -当前 Kernel API 强制要求 `startSnapshot.version == startVersion`: - -```java -// CommitRangeImpl.java line 125-127 -checkArgument( - startSnapshot.getVersion() == startVersion, - "startSnapshot must have version = startVersion"); -``` - -这导致: -1. 必须能够重建 startVersion 的 snapshot -2. 如果 startVersion 无法重建(missing checkpoint, unsupported protocol),则无法读取 -3. 无法实现 V1 的"用最新 P&M 读历史数据"策略 - -## 提案:允许 `snapshot.version >= startVersion` - -### 核心修改 - -```java -// 修改前 (CommitRangeImpl.java line 125-127) -checkArgument( - startSnapshot.getVersion() == startVersion, - "startSnapshot must have version = startVersion"); - -// 修改后 -checkArgument( - startSnapshot.getVersion() >= startVersion, - "startSnapshot version must be >= startVersion"); -``` - -### 语义变化 - -**修改前**: -- 必须用 startVersion 的 P&M 来读取 startVersion 的数据 -- 保证每个版本用其自己的 P&M 解析 - -**修改后**: -- 可以用 **任何 >= startVersion** 的 P&M 来读取 startVersion 及之后的数据 -- 允许用最新版本的 P&M 读取历史数据(类似 V1) -- 仍然保留用历史版本 P&M 的选项(如果可用) - -## 使用场景 - -### 场景 1: startVersion 无法重建(当前问题) - -```java -// 当前行为:失败 -try { - Snapshot snapshot1 = loadSnapshotAt(1); // 失败!protocol unsupported - commitRange.getActions(engine, snapshot1, actionSet); -} catch (Exception e) { - // 无法读取任何数据 -} - -// 提案后:使用最新 snapshot -Snapshot latestSnapshot = loadLatestSnapshot(); // 成功 -commitRange.getActions(engine, latestSnapshot, actionSet); -// 用最新 P&M 读取 version 1 的数据 -``` - -### 场景 2: 性能优化 - -不需要为每个 startVersion 重建 snapshot,可以复用缓存的 latest snapshot。 - -```java -// 当前:每次都要重建 snapshot -Snapshot snap1 = loadSnapshotAt(1); -Snapshot snap2 = loadSnapshotAt(2); -Snapshot snap3 = loadSnapshotAt(3); - -// 提案后:可以复用 -Snapshot latest = loadLatestSnapshot(); // 只加载一次 -commitRange1.getActions(engine, latest, actionSet); -commitRange2.getActions(engine, latest, actionSet); -commitRange3.getActions(engine, latest, actionSet); -``` - -### 场景 3: 灵活选择策略 - -```java -// 策略 1: 精确模式(更安全) -if (canReconstructSnapshot(startVersion)) { - Snapshot exactSnapshot = loadSnapshotAt(startVersion); - commitRange.getActions(engine, exactSnapshot, actionSet); -} - -// 策略 2: 最新模式(更实用) -else { - Snapshot latestSnapshot = loadLatestSnapshot(); - commitRange.getActions(engine, latestSnapshot, actionSet); - // 假设 P&M 向前兼容 -} -``` - -## 优势 - -### 1. 解决 startingVersion 测试失败 - -DSv2 可以实现类似 V1 的行为: - -```java -// SparkMicroBatchStream.java -private CloseableIterator filterDeltaLogs( - long startVersion, Optional endOffset) { - - CommitRange commitRange = snapshotManager.getTableChanges(...); - - // 尝试加载 startVersion 的 snapshot - Snapshot startSnapshot; - try { - startSnapshot = snapshotManager.loadSnapshotAt(startVersion); - } catch (KernelException e) { - // 回退到最新 snapshot(类似 V1) - startSnapshot = snapshotManager.loadLatestSnapshot(); - logWarning("Using latest snapshot P&M to read from version " + startVersion); - } - - // 现在可以工作了! - commitRange.getActions(engine, startSnapshot, ACTION_SET); -} -``` - -### 2. 保持 API 结构 - -不需要: -- 改变 API 签名(仍然需要 snapshot 参数) -- 添加复杂的 Optional 逻辑 -- 破坏现有代码 - -只需要: -- 放松一个 validation 检查 -- 允许更灵活的使用方式 - -### 3. 向后兼容 - -所有现有代码仍然工作: -```java -// 现有代码(exact version)仍然有效 -Snapshot snapshot = loadSnapshotAt(startVersion); -commitRange.getActions(engine, snapshot, actionSet); -// ✅ snapshot.version == startVersion,检查通过 -``` - -新代码可以使用更灵活的方式: -```java -// 新代码(latest version) -Snapshot latest = loadLatestSnapshot(); -commitRange.getActions(engine, latest, actionSet); -// ✅ latest.version >= startVersion,检查也通过 -``` - -### 4. 用户可选择安全级别 - -```java -// 配置选项 -if (spark.conf.get("spark.delta.streaming.useExactVersionSnapshot")) { - // 严格模式:必须用精确版本 - snapshot = loadSnapshotAt(startVersion); -} else { - // 宽松模式:允许用最新版本 - snapshot = loadLatestSnapshot(); -} -``` - -## 风险和限制 - -### 风险 1: P&M 不向前兼容 - -**问题**:新版本的 P&M 可能无法正确读取老版本的 AddFile - -**例子**: -``` -Version 1: No column mapping - AddFile.stats = {"id": {min: 0, max: 99}} - -Version 10: Column mapping enabled - Metadata expects physical names "col-abc123" - -// 用 version 10 的 P&M 读取 version 1 的 AddFile -// 可能误解 "id" 作为物理名而不是逻辑名 -``` - -**缓解**: -1. 文档明确说明风险 -2. 添加警告日志 -3. 提供配置选项让用户选择 -4. 在已知不兼容的情况下(如 column mapping 启用前后)抛出错误 - -### 风险 2: 协议版本不匹配 - -**问题**:更新的 snapshot 可能有更新的 protocol 要求 - -**例子**: -``` -Version 1: Reader=1, Writer=2 -Version 10: Reader=3, Writer=7, features=[deletionVectors] - -// 用 version 10 的 protocol 读取 version 1 的数据 -// Version 1 的 AddFile 没有 deletionVector 字段 -// 应该没问题,新 protocol 应该能读旧格式 -``` - -**缓解**: -- 协议设计上应该保证向前兼容 -- 如果不兼容,在 AddFile 解析时会失败(fail fast) - -### 风险 3: 语义混淆 - -**问题**:用户可能不理解为什么同一个 AddFile 用不同的 snapshot 读取结果不同 - -**缓解**: -- 详细文档说明行为 -- 明确的日志和警告 -- 提供配置选项控制行为 - -## 实现建议 - -### Phase 1: 最小修改(只放松检查) - -```java -// CommitRangeImpl.java -private void validateParameters( - Engine engine, Snapshot startSnapshot, Set actionSet) { - requireNonNull(engine, "engine cannot be null"); - requireNonNull(startSnapshot, "startSnapshot cannot be null"); - requireNonNull(actionSet, "actionSet cannot be null"); - - // 修改这里 - checkArgument( - startSnapshot.getVersion() >= startVersion, - "startSnapshot version must be >= startVersion (got %s, expected >= %s)", - startSnapshot.getVersion(), startVersion); -} -``` - -### Phase 2: 添加警告 - -```java -private void validateParameters(...) { - // ... 基本检查 ... - - if (startSnapshot.getVersion() > startVersion) { - logger.warn( - "Using snapshot at version {} to read commits starting from version {}. " + - "This assumes Protocol/Metadata are forward compatible. " + - "If you encounter issues, try using exactVersionSnapshot=true", - startSnapshot.getVersion(), startVersion); - } -} -``` - -### Phase 3: 添加配置选项(DSv2 层) - -```java -// SparkMicroBatchStream.java -private CloseableIterator filterDeltaLogs(...) { - boolean useExactVersion = sqlConf.getConf( - DeltaSQLConf.DELTA_STREAMING_DSV2_USE_EXACT_VERSION_SNAPSHOT); - - Snapshot startSnapshot; - if (useExactVersion) { - // 严格模式:必须精确版本 - startSnapshot = snapshotManager.loadSnapshotAt(startVersion); - } else { - // 宽松模式:尝试精确版本,失败则用最新 - try { - startSnapshot = snapshotManager.loadSnapshotAt(startVersion); - } catch (KernelException e) { - logger.warn("Cannot load snapshot at version {}, using latest", startVersion); - startSnapshot = snapshotManager.loadLatestSnapshot(); - } - } - - commitRange.getActions(engine, startSnapshot, ACTION_SET); -} -``` - -### Phase 4: 智能检测不兼容情况 - -```java -private void validateProtocolCompatibility( - Snapshot snapshot, long startVersion, long endVersion) { - - // 检查是否有重大 P&M 变化 - if (hasColumnMappingChanged(snapshot, startVersion)) { - throw new DeltaException( - "Column mapping mode changed between version " + startVersion + - " and " + snapshot.getVersion() + ". " + - "Cannot safely use newer snapshot to read older data. " + - "Please use exactVersionSnapshot=true"); - } - - // 其他不兼容检查... -} -``` - -## 完整的解决方案对比 - -### 方案 A: 当前状态(严格要求 exact version) -- ✅ 保证正确性 -- ❌ 无法处理无法重建的 snapshot -- ❌ startingVersion 测试失败 - -### 方案 B: 完全移除 startSnapshot 参数 -- ✅ 最灵活 -- ❌ 破坏 API -- ❌ 失去协议验证能力 - -### 方案 C: 本提案(允许 >= startVersion) -- ✅ 保持 API 结构 -- ✅ 解决无法重建的问题 -- ✅ 向后兼容 -- ✅ 用户可选择策略 -- ⚠️ 需要文档说明风险 -- ⚠️ 假设 P&M 向前兼容 - -### 方案 D: 方案 C + 智能检测 -- ✅ 方案 C 的所有优点 -- ✅ 自动检测不兼容情况 -- ✅ 在安全时允许,不安全时拒绝 -- ⚠️ 实现更复杂 - -## 推荐 - -**推荐采用方案 D(分阶段实施):** - -1. **Phase 1(立即)**: 修改 validation 检查,允许 `>= startVersion` -2. **Phase 2(短期)**: 添加警告日志,提醒用户风险 -3. **Phase 3(中期)**: 在 DSv2 添加配置选项,默认使用宽松模式 -4. **Phase 4(长期)**: 添加智能检测,自动识别不兼容情况 - -这样可以: -- 立即解决 startingVersion 测试失败问题 -- 保持足够的灵活性 -- 随时间推移增加安全检查 -- 让用户选择最适合的策略 - -## 文档要求 - -必须明确文档说明: - -1. **默认行为**:Kernel 允许使用更新版本的 snapshot -2. **假设**:假设 Protocol/Metadata 是向前兼容的 -3. **风险**:如果 P&M 发生不兼容变化(如启用 column mapping),可能产生错误结果 -4. **配置**:提供 `exactVersionSnapshot` 选项强制使用精确版本 -5. **最佳实践**:建议在生产环境使用精确版本模式 - -## 总结 - -**你的建议非常有价值!** - -通过允许 `snapshot.version >= startVersion`,我们可以: -- ✅ 解决 DSv2 的 startingVersion 测试失败 -- ✅ 实现类似 V1 的灵活性 -- ✅ 保持 Kernel API 的结构 -- ✅ 保留精确版本的选项(向后兼容) -- ✅ 让用户根据场景选择策略 - -这是一个**简单但强大**的修改,只需要改一行代码,但能解决根本性的架构限制。 - From 6536f15a888349633bea379302b5c996f7c0ccc3 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 24 Nov 2025 21:57:24 -0800 Subject: [PATCH 3/9] save --- ...icroBatchStreamGetStartingVersionTest.java | 17 +++- .../spark/read/SparkMicroBatchStreamTest.java | 82 +++++++++++++++++-- 2 files changed, 90 insertions(+), 9 deletions(-) diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java index 6b378323f74..4ecd11a81af 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java @@ -97,8 +97,9 @@ public void testGetStartingVersion_NoOptions(@TempDir File tempDir) throws Excep // dsv2 PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, new Configuration()); + io.delta.kernel.Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); SparkMicroBatchStream dsv2Stream = - new SparkMicroBatchStream(snapshotManager, new Configuration()); + new SparkMicroBatchStream(snapshotManager, initialSnapshot, new Configuration()); Optional dsv2Result = dsv2Stream.getStartingVersion(); compareStartingVersionResults(dsv1Result, dsv2Result, Optional.empty(), "No options provided"); @@ -218,9 +219,14 @@ public void testGetStartingVersion_ProtocolValidationNonFeatureExceptionFallback // dsv2 PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, new Configuration()); + io.delta.kernel.Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); SparkMicroBatchStream dsv2Stream = new SparkMicroBatchStream( - snapshotManager, new Configuration(), spark, createDeltaOptions(startingVersion)); + snapshotManager, + initialSnapshot, + new Configuration(), + spark, + createDeltaOptions(startingVersion)); Optional dsv2Result = dsv2Stream.getStartingVersion(); compareStartingVersionResults( @@ -257,9 +263,14 @@ private void testAndCompareStartingVersion( // DSv2: Create SparkMicroBatchStream and get starting version PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, new Configuration()); + io.delta.kernel.Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); SparkMicroBatchStream dsv2Stream = new SparkMicroBatchStream( - snapshotManager, new Configuration(), spark, createDeltaOptions(startingVersion)); + snapshotManager, + initialSnapshot, + new Configuration(), + spark, + createDeltaOptions(startingVersion)); Optional dsv2Result = dsv2Stream.getStartingVersion(); compareStartingVersionResults(dsv1Result, dsv2Result, expectedVersion, testDescription); diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java index dbf4cb763c0..76910756b29 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java @@ -51,10 +51,15 @@ public class SparkMicroBatchStreamTest extends SparkDsv2TestBase { @BeforeEach void setUp(@TempDir File tempDir) { String testPath = tempDir.getAbsolutePath(); + String testTableName = "test_setup_" + System.nanoTime(); + // Create an empty Delta table so we can load a valid snapshot + createEmptyTestTable(testPath, testTableName); PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testPath, spark.sessionState().newHadoopConf()); + io.delta.kernel.Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); microBatchStream = - new SparkMicroBatchStream(snapshotManager, spark.sessionState().newHadoopConf()); + new SparkMicroBatchStream( + snapshotManager, initialSnapshot, spark.sessionState().newHadoopConf()); } @Test @@ -174,7 +179,9 @@ public void testGetFileChanges( // dsv2 SparkMicroBatchStream PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, new Configuration()); - SparkMicroBatchStream stream = new SparkMicroBatchStream(snapshotManager, new Configuration()); + io.delta.kernel.Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); + SparkMicroBatchStream stream = + new SparkMicroBatchStream(snapshotManager, initialSnapshot, new Configuration()); Option endOffsetOption = scalaEndOffset; try (CloseableIterator kernelChanges = stream.getFileChanges(fromVersion, fromIndex, isInitialSnapshot, endOffsetOption)) { @@ -285,7 +292,9 @@ public void testGetFileChangesWithRateLimit( // dsv2 SparkMicroBatchStream PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, new Configuration()); - SparkMicroBatchStream stream = new SparkMicroBatchStream(snapshotManager, new Configuration()); + io.delta.kernel.Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); + SparkMicroBatchStream stream = + new SparkMicroBatchStream(snapshotManager, initialSnapshot, new Configuration()); // We need a separate AdmissionLimits object for DSv2 because the method is stateful. Option dsv2Limits = createAdmissionLimits(deltaSource, maxFiles, maxBytes); @@ -407,10 +416,12 @@ public void testGetFileChanges_EmptyVersions( deltaChanges.close(); // Test DSv2 SparkMicroBatchStream + PathBasedSnapshotManager dsv2SnapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); + io.delta.kernel.Snapshot dsv2InitialSnapshot = dsv2SnapshotManager.loadLatestSnapshot(); SparkMicroBatchStream stream = new SparkMicroBatchStream( - new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()), - spark.sessionState().newHadoopConf()); + dsv2SnapshotManager, dsv2InitialSnapshot, spark.sessionState().newHadoopConf()); try (CloseableIterator kernelChanges = stream.getFileChanges(fromVersion, fromIndex, isInitialSnapshot, endOffset)) { List kernelFilesList = new ArrayList<>(); @@ -498,9 +509,12 @@ public void testGetFileChanges_OnRemoveFile_throwError( String.format("DSv1 should throw on REMOVE for scenario: %s", testDescription)); // Test DSv2 SparkMicroBatchStream + PathBasedSnapshotManager snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); SparkMicroBatchStream stream = new SparkMicroBatchStream( - new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()), + snapshotManager, + snapshotManager.loadLatestSnapshot(), spark.sessionState().newHadoopConf()); UnsupportedOperationException dsv2Exception = assertThrows( @@ -601,6 +615,62 @@ private static Stream removeFileScenarios() { "MERGE: Matched (REMOVE+ADD) and not matched (ADD)")); } + @Test + public void testGetFileChanges_StartingVersionAfterCheckpointAndLogCleanup(@TempDir File tempDir) + throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_checkpoint_cleanup_" + System.nanoTime(); + createEmptyTestTable(testTablePath, testTableName); + + // Insert 5 versions + for (int i = 1; i <= 5; i++) { + sql("INSERT INTO %s VALUES (%d, 'User%d')", testTableName, i, i); + } + + // Create checkpoint at version 5 + DeltaLog.forTable(spark, new Path(testTablePath)).checkpoint(); + + // Delete 0.json to simulate log cleanup + Path logPath = new Path(testTablePath, "_delta_log"); + Path logFile0 = new Path(logPath, "00000000000000000000.json"); + File file0 = new File(logFile0.toUri().getPath()); + if (file0.exists()) { + file0.delete(); + } + + // Now test with startingVersion=1 + PathBasedSnapshotManager snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); + SparkMicroBatchStream stream = + new SparkMicroBatchStream( + snapshotManager, + snapshotManager.loadLatestSnapshot(), + spark.sessionState().newHadoopConf()); + + // Get file changes from version 1 onwards + try (CloseableIterator kernelChanges = + stream.getFileChanges( + /* fromVersion= */ 1L, + /* fromIndex= */ DeltaSourceOffset.BASE_INDEX(), + /* isInitialSnapshot= */ false, + /* endOffset= */ Option.empty())) { + + List kernelFilesList = new ArrayList<>(); + while (kernelChanges.hasNext()) { + kernelFilesList.add(kernelChanges.next()); + } + + // Filter to get only actual data files (addFile != null) + long actualFileCount = kernelFilesList.stream().filter(f -> f.getAddFile() != null).count(); + + // Should be able to read 5 data files from versions 1-5 + assertEquals( + 5, + actualFileCount, + "Should read 5 data files from versions 1-5 even though version 0 log is deleted"); + } + } + // ================================================================================================ // Helper methods // ================================================================================================ From f38a94911cf25069348c7af991c757f693f0ebf7 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 26 Nov 2025 14:55:04 -0800 Subject: [PATCH 4/9] fix --- .../src/main/java/io/delta/kernel/spark/read/SparkScan.java | 3 ++- .../java/io/delta/kernel/spark/read/SparkScanBuilder.java | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java index dfb6fa6f3b6..a9762e13c83 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java @@ -17,6 +17,7 @@ import static io.delta.kernel.spark.utils.ExpressionUtils.dsv2PredicateToCatalystExpression; +import io.delta.kernel.Snapshot; import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.MapValue; import io.delta.kernel.data.Row; @@ -55,7 +56,7 @@ public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntimeV2Filtering { private final DeltaSnapshotManager snapshotManager; - private final io.delta.kernel.Snapshot initialSnapshot; + private final Snapshot initialSnapshot; private final StructType readDataSchema; private final StructType dataSchema; private final StructType partitionSchema; diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java index 3a06d13726b..b0b58b786ba 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java @@ -17,6 +17,7 @@ import static java.util.Objects.requireNonNull; +import io.delta.kernel.Snapshot; import io.delta.kernel.expressions.And; import io.delta.kernel.expressions.Predicate; import io.delta.kernel.spark.snapshot.DeltaSnapshotManager; @@ -38,8 +39,8 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownFilters { - private io.delta.kernel.ScanBuilder kernelScanBuilder; - private final io.delta.kernel.Snapshot initialSnapshot; + private ScanBuilder kernelScanBuilder; + private final Snapshot initialSnapshot; private final DeltaSnapshotManager snapshotManager; private final StructType dataSchema; private final StructType partitionSchema; From 29fd3012cdd125438f632405052f27d4c2d8d65e Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 2 Dec 2025 13:49:07 -0800 Subject: [PATCH 5/9] address --- .../spark/read/SparkMicroBatchStream.java | 14 ++++++------- .../io/delta/kernel/spark/read/SparkScan.java | 2 +- .../kernel/spark/read/SparkScanBuilder.java | 2 +- .../kernel/spark/utils/StreamingHelper.java | 18 ++++++++++++++++ ...icroBatchStreamGetStartingVersionTest.java | 9 +++----- .../spark/read/SparkMicroBatchStreamTest.java | 21 ++++++------------- 6 files changed, 36 insertions(+), 30 deletions(-) diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java index 6a7261e42ab..14601cdf471 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java @@ -60,13 +60,13 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final DeltaSnapshotManager snapshotManager; private final DeltaOptions options; private final SparkSession spark; - private final Snapshot initialSnapshot; + private final String tablePath; public SparkMicroBatchStream( - DeltaSnapshotManager snapshotManager, Snapshot initialSnapshot, Configuration hadoopConf) { + DeltaSnapshotManager snapshotManager, String tablePath, Configuration hadoopConf) { this( snapshotManager, - initialSnapshot, + tablePath, hadoopConf, SparkSession.active(), new DeltaOptions( @@ -76,13 +76,13 @@ public SparkMicroBatchStream( public SparkMicroBatchStream( DeltaSnapshotManager snapshotManager, - Snapshot initialSnapshot, + String tablePath, Configuration hadoopConf, SparkSession spark, DeltaOptions options) { this.spark = spark; this.snapshotManager = snapshotManager; - this.initialSnapshot = initialSnapshot; + this.tablePath = tablePath; this.engine = DefaultEngine.create(hadoopConf); this.options = options; } @@ -303,7 +303,7 @@ private CloseableIterator filterDeltaLogs( StreamingHelper.getActionsFromRangeUnsafe( engine, (io.delta.kernel.internal.commitrange.CommitRangeImpl) commitRange, - initialSnapshot.getPath().toString(), + tablePath, ACTION_SET)) { // Each ColumnarBatch belongs to a single commit version, // but a single version may span multiple ColumnarBatches. @@ -330,7 +330,7 @@ private CloseableIterator filterDeltaLogs( // TODO(#5318): migrate to kernel's commit-level iterator (WIP). // The current one-pass algorithm assumes REMOVE actions proceed ADD actions // in a commit; we should implement a proper two-pass approach once kernel API is ready. - validateCommit(batch, version, initialSnapshot.getPath().toString(), endOffset); + validateCommit(batch, version, tablePath, endOffset); currentVersion = version; currentIndex = diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java index a9762e13c83..89d50d070c4 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java @@ -134,7 +134,7 @@ public Batch toBatch() { public MicroBatchStream toMicroBatchStream(String checkpointLocation) { DeltaOptions deltaOptions = new DeltaOptions(scalaOptions, sqlConf); return new SparkMicroBatchStream( - snapshotManager, initialSnapshot, hadoopConf, SparkSession.active(), deltaOptions); + snapshotManager, getTablePath(), hadoopConf, SparkSession.active(), deltaOptions); } @Override diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java index b0b58b786ba..0f0aa56bdb9 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java @@ -39,7 +39,7 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownFilters { - private ScanBuilder kernelScanBuilder; + private io.delta.kernel.ScanBuilder kernelScanBuilder; private final Snapshot initialSnapshot; private final DeltaSnapshotManager snapshotManager; private final StructType dataSchema; diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java index 97482efb76c..01982a68b47 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java @@ -97,6 +97,24 @@ public static Optional getDataChangeRemove(ColumnarBatch batch, int return removeFile.getDataChange() ? Optional.of(removeFile) : Optional.empty(); } + /** + * Gets actions from a commit range without requiring a snapshot at the exact start version. + * + *

This method is "unsafe" because it bypasses the standard {@code CommitRange.getActions()} + * API which requires a snapshot at the exact start version for protocol validation. Instead, it + * directly reads commit files and validates protocol per-commit, allowing a newer snapshot to be + * used for reading historical commits. + * + *

This is necessary for streaming scenarios where the start version might not have a + * recreatable snapshot (e.g., after log cleanup) or where {@code startingVersion="latest"} is + * used. + * + * @param engine the Delta engine + * @param commitRange the commit range to read actions from + * @param tablePath the path to the Delta table + * @param actionSet the set of actions to read (e.g., ADD, REMOVE) + * @return an iterator over columnar batches containing the requested actions + */ public static CloseableIterator getActionsFromRangeUnsafe( Engine engine, CommitRangeImpl commitRange, diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java index 4ecd11a81af..97170c2a9fd 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java @@ -97,9 +97,8 @@ public void testGetStartingVersion_NoOptions(@TempDir File tempDir) throws Excep // dsv2 PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, new Configuration()); - io.delta.kernel.Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); SparkMicroBatchStream dsv2Stream = - new SparkMicroBatchStream(snapshotManager, initialSnapshot, new Configuration()); + new SparkMicroBatchStream(snapshotManager, testTablePath, new Configuration()); Optional dsv2Result = dsv2Stream.getStartingVersion(); compareStartingVersionResults(dsv1Result, dsv2Result, Optional.empty(), "No options provided"); @@ -219,11 +218,10 @@ public void testGetStartingVersion_ProtocolValidationNonFeatureExceptionFallback // dsv2 PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, new Configuration()); - io.delta.kernel.Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); SparkMicroBatchStream dsv2Stream = new SparkMicroBatchStream( snapshotManager, - initialSnapshot, + testTablePath, new Configuration(), spark, createDeltaOptions(startingVersion)); @@ -263,11 +261,10 @@ private void testAndCompareStartingVersion( // DSv2: Create SparkMicroBatchStream and get starting version PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, new Configuration()); - io.delta.kernel.Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); SparkMicroBatchStream dsv2Stream = new SparkMicroBatchStream( snapshotManager, - initialSnapshot, + testTablePath, new Configuration(), spark, createDeltaOptions(startingVersion)); diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java index 76910756b29..8e641b722a8 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java @@ -56,10 +56,8 @@ void setUp(@TempDir File tempDir) { createEmptyTestTable(testPath, testTableName); PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testPath, spark.sessionState().newHadoopConf()); - io.delta.kernel.Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); microBatchStream = - new SparkMicroBatchStream( - snapshotManager, initialSnapshot, spark.sessionState().newHadoopConf()); + new SparkMicroBatchStream(snapshotManager, testPath, spark.sessionState().newHadoopConf()); } @Test @@ -179,9 +177,8 @@ public void testGetFileChanges( // dsv2 SparkMicroBatchStream PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, new Configuration()); - io.delta.kernel.Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); SparkMicroBatchStream stream = - new SparkMicroBatchStream(snapshotManager, initialSnapshot, new Configuration()); + new SparkMicroBatchStream(snapshotManager, testTablePath, new Configuration()); Option endOffsetOption = scalaEndOffset; try (CloseableIterator kernelChanges = stream.getFileChanges(fromVersion, fromIndex, isInitialSnapshot, endOffsetOption)) { @@ -292,9 +289,8 @@ public void testGetFileChangesWithRateLimit( // dsv2 SparkMicroBatchStream PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, new Configuration()); - io.delta.kernel.Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); SparkMicroBatchStream stream = - new SparkMicroBatchStream(snapshotManager, initialSnapshot, new Configuration()); + new SparkMicroBatchStream(snapshotManager, testTablePath, new Configuration()); // We need a separate AdmissionLimits object for DSv2 because the method is stateful. Option dsv2Limits = createAdmissionLimits(deltaSource, maxFiles, maxBytes); @@ -418,10 +414,9 @@ public void testGetFileChanges_EmptyVersions( // Test DSv2 SparkMicroBatchStream PathBasedSnapshotManager dsv2SnapshotManager = new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); - io.delta.kernel.Snapshot dsv2InitialSnapshot = dsv2SnapshotManager.loadLatestSnapshot(); SparkMicroBatchStream stream = new SparkMicroBatchStream( - dsv2SnapshotManager, dsv2InitialSnapshot, spark.sessionState().newHadoopConf()); + dsv2SnapshotManager, testTablePath, spark.sessionState().newHadoopConf()); try (CloseableIterator kernelChanges = stream.getFileChanges(fromVersion, fromIndex, isInitialSnapshot, endOffset)) { List kernelFilesList = new ArrayList<>(); @@ -513,9 +508,7 @@ public void testGetFileChanges_OnRemoveFile_throwError( new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); SparkMicroBatchStream stream = new SparkMicroBatchStream( - snapshotManager, - snapshotManager.loadLatestSnapshot(), - spark.sessionState().newHadoopConf()); + snapshotManager, testTablePath, spark.sessionState().newHadoopConf()); UnsupportedOperationException dsv2Exception = assertThrows( UnsupportedOperationException.class, @@ -643,9 +636,7 @@ public void testGetFileChanges_StartingVersionAfterCheckpointAndLogCleanup(@Temp new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); SparkMicroBatchStream stream = new SparkMicroBatchStream( - snapshotManager, - snapshotManager.loadLatestSnapshot(), - spark.sessionState().newHadoopConf()); + snapshotManager, testTablePath, spark.sessionState().newHadoopConf()); // Get file changes from version 1 onwards try (CloseableIterator kernelChanges = From c5d9d23cde1a9896be92a595ea354e98a59a4cbb Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 2 Dec 2025 14:12:45 -0800 Subject: [PATCH 6/9] address --- .../spark/read/SparkMicroBatchStream.java | 20 +++++++------- .../io/delta/kernel/spark/read/SparkScan.java | 2 +- ...icroBatchStreamGetStartingVersionTest.java | 7 ++--- .../spark/read/SparkMicroBatchStreamTest.java | 27 ++++++++++++------- 4 files changed, 32 insertions(+), 24 deletions(-) diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java index a423d36b39a..aaadaca45cb 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java @@ -59,16 +59,18 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio private final Engine engine; private final DeltaSnapshotManager snapshotManager; private final DeltaOptions options; + private final Snapshot snapshotAtSourceInit; private final String tableId; private final boolean shouldValidateOffsets; private final SparkSession spark; - private final String tablePath; public SparkMicroBatchStream( - DeltaSnapshotManager snapshotManager, String tablePath, Configuration hadoopConf) { + DeltaSnapshotManager snapshotManager, + Snapshot snapshotAtSourceInit, + Configuration hadoopConf) { this( snapshotManager, - tablePath, + snapshotAtSourceInit, hadoopConf, SparkSession.active(), new DeltaOptions( @@ -78,20 +80,16 @@ public SparkMicroBatchStream( public SparkMicroBatchStream( DeltaSnapshotManager snapshotManager, - String tablePath, + Snapshot snapshotAtSourceInit, Configuration hadoopConf, SparkSession spark, DeltaOptions options) { this.spark = spark; this.snapshotManager = snapshotManager; - this.tablePath = tablePath; + this.snapshotAtSourceInit = snapshotAtSourceInit; this.engine = DefaultEngine.create(hadoopConf); this.options = options; - - // Initialize snapshot at source init to get table ID, similar to DeltaSource.scala - Snapshot snapshotAtSourceInit = snapshotManager.loadLatestSnapshot(); this.tableId = ((SnapshotImpl) snapshotAtSourceInit).getMetadata().getId(); - this.shouldValidateOffsets = (Boolean) spark.sessionState().conf().getConf(DeltaSQLConf.STREAMING_OFFSET_VALIDATION()); } @@ -394,7 +392,7 @@ private CloseableIterator filterDeltaLogs( StreamingHelper.getActionsFromRangeUnsafe( engine, (io.delta.kernel.internal.commitrange.CommitRangeImpl) commitRange, - tablePath, + snapshotAtSourceInit.getPath(), ACTION_SET)) { // Each ColumnarBatch belongs to a single commit version, // but a single version may span multiple ColumnarBatches. @@ -421,7 +419,7 @@ private CloseableIterator filterDeltaLogs( // TODO(#5318): migrate to kernel's commit-level iterator (WIP). // The current one-pass algorithm assumes REMOVE actions proceed ADD actions // in a commit; we should implement a proper two-pass approach once kernel API is ready. - validateCommit(batch, version, tablePath, endOffset); + validateCommit(batch, version, snapshotAtSourceInit.getPath(), endOffset); currentVersion = version; currentIndex = diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java index 89d50d070c4..a9762e13c83 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java @@ -134,7 +134,7 @@ public Batch toBatch() { public MicroBatchStream toMicroBatchStream(String checkpointLocation) { DeltaOptions deltaOptions = new DeltaOptions(scalaOptions, sqlConf); return new SparkMicroBatchStream( - snapshotManager, getTablePath(), hadoopConf, SparkSession.active(), deltaOptions); + snapshotManager, initialSnapshot, hadoopConf, SparkSession.active(), deltaOptions); } @Override diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java index 97170c2a9fd..dcf4de9cbe1 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java @@ -98,7 +98,8 @@ public void testGetStartingVersion_NoOptions(@TempDir File tempDir) throws Excep PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, new Configuration()); SparkMicroBatchStream dsv2Stream = - new SparkMicroBatchStream(snapshotManager, testTablePath, new Configuration()); + new SparkMicroBatchStream( + snapshotManager, snapshotManager.loadLatestSnapshot(), new Configuration()); Optional dsv2Result = dsv2Stream.getStartingVersion(); compareStartingVersionResults(dsv1Result, dsv2Result, Optional.empty(), "No options provided"); @@ -221,7 +222,7 @@ public void testGetStartingVersion_ProtocolValidationNonFeatureExceptionFallback SparkMicroBatchStream dsv2Stream = new SparkMicroBatchStream( snapshotManager, - testTablePath, + snapshotManager.loadLatestSnapshot(), new Configuration(), spark, createDeltaOptions(startingVersion)); @@ -264,7 +265,7 @@ private void testAndCompareStartingVersion( SparkMicroBatchStream dsv2Stream = new SparkMicroBatchStream( snapshotManager, - testTablePath, + snapshotManager.loadLatestSnapshot(), new Configuration(), spark, createDeltaOptions(startingVersion)); diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java index d9fb6a1af59..f48f5109974 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java @@ -62,7 +62,7 @@ private SparkMicroBatchStream createTestStream(File tempDir) { PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf); return new SparkMicroBatchStream( snapshotManager, - tablePath, + snapshotManager.loadLatestSnapshot(), hadoopConf, spark, new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf())); @@ -197,7 +197,8 @@ public void testGetFileChanges( PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, hadoopConf); SparkMicroBatchStream stream = - new SparkMicroBatchStream(snapshotManager, testTablePath, hadoopConf); + new SparkMicroBatchStream( + snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf); Optional endOffsetOption = ScalaUtils.toJavaOptional(scalaEndOffset); try (CloseableIterator kernelChanges = stream.getFileChanges(fromVersion, fromIndex, isInitialSnapshot, endOffsetOption)) { @@ -314,7 +315,8 @@ public void testGetFileChangesWithRateLimit( PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, hadoopConf); SparkMicroBatchStream stream = - new SparkMicroBatchStream(snapshotManager, testTablePath, hadoopConf); + new SparkMicroBatchStream( + snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf); // We need a separate AdmissionLimits object for DSv2 because the method is stateful. Optional dsv2Limits = createAdmissionLimits(deltaSource, maxFiles, maxBytes); @@ -440,7 +442,8 @@ public void testGetFileChanges_EmptyVersions( PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, hadoopConf); SparkMicroBatchStream stream = - new SparkMicroBatchStream(snapshotManager, testTablePath, hadoopConf); + new SparkMicroBatchStream( + snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf); try (CloseableIterator kernelChanges = stream.getFileChanges( fromVersion, fromIndex, isInitialSnapshot, ScalaUtils.toJavaOptional(endOffset))) { @@ -533,7 +536,8 @@ public void testGetFileChanges_OnRemoveFile_throwError( PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, hadoopConf); SparkMicroBatchStream stream = - new SparkMicroBatchStream(snapshotManager, testTablePath, hadoopConf); + new SparkMicroBatchStream( + snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf); UnsupportedOperationException dsv2Exception = assertThrows( UnsupportedOperationException.class, @@ -666,7 +670,9 @@ public void testGetFileChanges_StartingVersionAfterCheckpointAndLogCleanup(@Temp new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); SparkMicroBatchStream stream = new SparkMicroBatchStream( - snapshotManager, testTablePath, spark.sessionState().newHadoopConf()); + snapshotManager, + snapshotManager.loadLatestSnapshot(), + spark.sessionState().newHadoopConf()); // Get file changes from version 1 onwards try (CloseableIterator kernelChanges = @@ -734,7 +740,8 @@ public void testLatestOffset_NotInitialSnapshot( PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, hadoopConf); SparkMicroBatchStream stream = - new SparkMicroBatchStream(snapshotManager, testTablePath, hadoopConf); + new SparkMicroBatchStream( + snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf); Offset v2EndOffset = stream.latestOffset(startOffset, readLimit); compareOffsets(v1EndOffset, v2EndOffset, testDescription); @@ -853,7 +860,8 @@ public void testLatestOffset_SequentialBatchAdvancement( PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, hadoopConf); SparkMicroBatchStream stream = - new SparkMicroBatchStream(snapshotManager, testTablePath, hadoopConf); + new SparkMicroBatchStream( + snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf); List dsv2Offsets = advanceOffsetSequenceDsv2(stream, startOffset, numIterations, readLimit); @@ -979,7 +987,8 @@ public void testLatestOffset_NoNewDataAtLatestVersion( PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(testTablePath, hadoopConf); SparkMicroBatchStream stream = - new SparkMicroBatchStream(snapshotManager, testTablePath, hadoopConf); + new SparkMicroBatchStream( + snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf); Offset dsv2Offset = stream.latestOffset(startOffset, readLimit); compareOffsets(dsv1Offset, dsv2Offset, testDescription); From 36ebe0d8ad983f17951b8e02efea7f715f24e15f Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 2 Dec 2025 14:15:16 -0800 Subject: [PATCH 7/9] address --- .../java/io/delta/kernel/spark/utils/StreamingHelper.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java index 87682bf999b..65f86988c8f 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java @@ -101,12 +101,10 @@ public static Optional getDataChangeRemove(ColumnarBatch batch, int * Gets actions from a commit range without requiring a snapshot at the exact start version. * *

This method is "unsafe" because it bypasses the standard {@code CommitRange.getActions()} - * API which requires a snapshot at the exact start version for protocol validation. Instead, it - * directly reads commit files and validates protocol per-commit, allowing a newer snapshot to be - * used for reading historical commits. + * API which requires a snapshot at the exact start version for protocol validation. * *

This is necessary for streaming scenarios where the start version might not have a - * recreatable snapshot (e.g., after log cleanup) or where {@code startingVersion="latest"} is + * recreatable snapshot (e.g., after log cleanup) or where {@code startingVersion} is * used. * * @param engine the Delta engine From 853370bd090126746056d880d1e66330b63ba023 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 2 Dec 2025 14:23:35 -0800 Subject: [PATCH 8/9] address --- .../main/java/io/delta/kernel/spark/utils/StreamingHelper.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java index 65f86988c8f..de495763f80 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java @@ -104,8 +104,7 @@ public static Optional getDataChangeRemove(ColumnarBatch batch, int * API which requires a snapshot at the exact start version for protocol validation. * *

This is necessary for streaming scenarios where the start version might not have a - * recreatable snapshot (e.g., after log cleanup) or where {@code startingVersion} is - * used. + * recreatable snapshot (e.g., after log cleanup) or where {@code startingVersion} is used. * * @param engine the Delta engine * @param commitRange the commit range to read actions from From 21295878d24d6e906b92cde3e0f9cf2622dd5c42 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Thu, 4 Dec 2025 09:51:21 -0800 Subject: [PATCH 9/9] fix --- .../io/delta/kernel/spark/read/SparkMicroBatchStream.java | 6 ++++++ .../src/main/java/io/delta/kernel/spark/read/SparkScan.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java index aaadaca45cb..7cb882a6ff6 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java @@ -388,6 +388,12 @@ private CloseableIterator filterDeltaLogs( return Utils.toCloseableIterator(allIndexedFiles.iterator()); } + // Use getActionsFromRangeUnsafe instead of CommitRange.getActions() because: + // 1. CommitRange.getActions() requires a snapshot at exactly the startVersion, but when + // startingVersion option is used, we may not be able to recreate that exact snapshot + // (e.g., if log files have been cleaned up after checkpointing). + // 2. This matches DSv1 behavior which uses snapshotAtSourceInit's P&M to interpret all + // AddFile actions and performs per-commit protocol validation. try (CloseableIterator actionsIter = StreamingHelper.getActionsFromRangeUnsafe( engine, diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java index a9762e13c83..c53ecd9736e 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java @@ -76,7 +76,7 @@ public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntim public SparkScan( DeltaSnapshotManager snapshotManager, - io.delta.kernel.Snapshot initialSnapshot, + Snapshot initialSnapshot, StructType dataSchema, StructType partitionSchema, StructType readDataSchema,