Skip to content

Conversation

@huan233usc
Copy link
Collaborator

@huan233usc huan233usc commented Nov 25, 2025

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR fixes the startingVersion support in DSv2 streaming by skipping start snapshot validation when calling getActions.

Problem

The Kernel API's CommitRange.getActions() strictly required snapshot.version == startVersion, which failed when:

  • startingVersion="latest" and no new data exists (effective version = latestVersion + 1)
  • Historical snapshots are no longer recreatable due to log cleanup

Solution

Use low-level Kernel API: Call DeltaLogActionUtils.getActionsFromCommitFilesWithProtocolValidation() directly instead of CommitRange.getActions(), bypassing the strict snapshot version check

Whether we want to update kernel API to allow us bypass validation worth longer discussion. We just made this change in Dsv2 connector to make sure v1 behavior parity.

How was this patch tested?

Unit

Does this PR introduce any user-facing changes?

No

…ad time

This change fixes the startingVersion support in DSv2 streaming by:

1. Removing the requirement for initialSnapshot parameter
2. Using getActionsFromCommitFilesWithProtocolValidation to validate
   protocol for each commit at read time (similar to V1 behavior)
3. Extracting getActionsUnsafe utility method to StreamingHelper

The key insight is that we don't need a pre-loaded snapshot for protocol
validation - we can validate each commit's protocol as we read it, which
matches V1's behavior and avoids the issue where initialSnapshot.version
< startVersion.

Test: DeltaSourceDSv2Suite startingVersion tests now pass
@huan233usc huan233usc changed the title Dsv2 fix startingversion [DSv2] Fix startingVersion support by skipping start snapshot validation Nov 26, 2025
@huan233usc huan233usc changed the title [DSv2] Fix startingVersion support by skipping start snapshot validation [DSv2] Fix startingVersion support on non re-creatable version by skipping start snapshot validation Nov 26, 2025
@huan233usc huan233usc changed the title [DSv2] Fix startingVersion support on non re-creatable version by skipping start snapshot validation [Kernel-Spark] Fix startingVersion support on non re-creatable version by skipping start snapshot validation Nov 26, 2025
return removeFile.getDataChange() ? Optional.of(removeFile) : Optional.empty();
}

public static CloseableIterator<ColumnarBatch> getActionsFromRangeUnsafe(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment explain in what way this is unsafe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public SparkMicroBatchStream(
DeltaSnapshotManager snapshotManager,
Snapshot initialSnapshot,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't necessarily need to plumb in an initialSnapshot, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to get the table path and pass to ValidateCommit method. Let me just plumb the path instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually after rebasing I found that we are trying to load the latest snapshot again. So we may want to plumb it so that we could get snapshot, table id and path without getting snapshot again

- Resolved conflicts in SparkMicroBatchStream.java and SparkMicroBatchStreamTest.java
- Updated constructor calls to use tablePath parameter instead of initialSnapshot
- Applied master's improvements (try-catch for CommitRangeNotFoundException, Java Optional usage)
@huan233usc huan233usc requested review from gengliangwang and zikangh and removed request for zikangh December 2, 2025 22:34
Copy link
Contributor

@zikangh zikangh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, Xin!


public SparkScan(
DeltaSnapshotManager snapshotManager,
io.delta.kernel.Snapshot initialSnapshot,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to fully quality here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, let's avoid this

String tablePath = startSnapshot.getPath();
try (CloseableIterator<ColumnarBatch> actionsIter =
commitRange.getActions(engine, startSnapshot, ACTION_SET)) {
StreamingHelper.getActionsFromRangeUnsafe(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment here documenting the exact use case for why we are calling getActionsFromRangeUnsafe?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants