Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,18 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio
private final Engine engine;
private final DeltaSnapshotManager snapshotManager;
private final DeltaOptions options;
private final Snapshot snapshotAtSourceInit;
private final String tableId;
private final boolean shouldValidateOffsets;
private final SparkSession spark;

public SparkMicroBatchStream(DeltaSnapshotManager snapshotManager, Configuration hadoopConf) {
public SparkMicroBatchStream(
DeltaSnapshotManager snapshotManager,
Snapshot snapshotAtSourceInit,
Configuration hadoopConf) {
this(
snapshotManager,
snapshotAtSourceInit,
hadoopConf,
SparkSession.active(),
new DeltaOptions(
Expand All @@ -75,18 +80,16 @@ public SparkMicroBatchStream(DeltaSnapshotManager snapshotManager, Configuration

public SparkMicroBatchStream(
DeltaSnapshotManager snapshotManager,
Snapshot snapshotAtSourceInit,
Configuration hadoopConf,
SparkSession spark,
DeltaOptions options) {
this.spark = spark;
this.snapshotManager = snapshotManager;
this.snapshotAtSourceInit = snapshotAtSourceInit;
this.engine = DefaultEngine.create(hadoopConf);
this.options = options;

// Initialize snapshot at source init to get table ID, similar to DeltaSource.scala
Snapshot snapshotAtSourceInit = snapshotManager.loadLatestSnapshot();
this.tableId = ((SnapshotImpl) snapshotAtSourceInit).getMetadata().getId();

this.shouldValidateOffsets =
(Boolean) spark.sessionState().conf().getConf(DeltaSQLConf.STREAMING_OFFSET_VALIDATION());
}
Expand Down Expand Up @@ -385,11 +388,12 @@ private CloseableIterator<IndexedFile> filterDeltaLogs(
return Utils.toCloseableIterator(allIndexedFiles.iterator());
}

// Required by kernel: perform protocol validation by creating a snapshot at startVersion.
Snapshot startSnapshot = snapshotManager.loadSnapshotAt(startVersion);
String tablePath = startSnapshot.getPath();
try (CloseableIterator<ColumnarBatch> actionsIter =
commitRange.getActions(engine, startSnapshot, ACTION_SET)) {
StreamingHelper.getActionsFromRangeUnsafe(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment here documenting the exact use case for why we are calling getActionsFromRangeUnsafe?

engine,
(io.delta.kernel.internal.commitrange.CommitRangeImpl) commitRange,
snapshotAtSourceInit.getPath(),
ACTION_SET)) {
// Each ColumnarBatch belongs to a single commit version,
// but a single version may span multiple ColumnarBatches.
long currentVersion = -1;
Expand All @@ -415,7 +419,7 @@ private CloseableIterator<IndexedFile> filterDeltaLogs(
// TODO(#5318): migrate to kernel's commit-level iterator (WIP).
// The current one-pass algorithm assumes REMOVE actions proceed ADD actions
// in a commit; we should implement a proper two-pass approach once kernel API is ready.
validateCommit(batch, version, tablePath, endOffset);
validateCommit(batch, version, snapshotAtSourceInit.getPath(), endOffset);

currentVersion = version;
currentIndex =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static io.delta.kernel.spark.utils.ExpressionUtils.dsv2PredicateToCatalystExpression;

import io.delta.kernel.Snapshot;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
Expand Down Expand Up @@ -55,6 +56,7 @@
public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntimeV2Filtering {

private final DeltaSnapshotManager snapshotManager;
private final Snapshot initialSnapshot;
private final StructType readDataSchema;
private final StructType dataSchema;
private final StructType partitionSchema;
Expand All @@ -74,6 +76,7 @@ public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntim

public SparkScan(
DeltaSnapshotManager snapshotManager,
io.delta.kernel.Snapshot initialSnapshot,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to fully quality here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, let's avoid this

StructType dataSchema,
StructType partitionSchema,
StructType readDataSchema,
Expand All @@ -83,6 +86,7 @@ public SparkScan(
CaseInsensitiveStringMap options) {

this.snapshotManager = Objects.requireNonNull(snapshotManager, "snapshotManager is null");
this.initialSnapshot = Objects.requireNonNull(initialSnapshot, "initialSnapshot is null");
this.dataSchema = Objects.requireNonNull(dataSchema, "dataSchema is null");
this.partitionSchema = Objects.requireNonNull(partitionSchema, "partitionSchema is null");
this.readDataSchema = Objects.requireNonNull(readDataSchema, "readDataSchema is null");
Expand Down Expand Up @@ -130,7 +134,7 @@ public Batch toBatch() {
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
DeltaOptions deltaOptions = new DeltaOptions(scalaOptions, sqlConf);
return new SparkMicroBatchStream(
snapshotManager, hadoopConf, SparkSession.active(), deltaOptions);
snapshotManager, initialSnapshot, hadoopConf, SparkSession.active(), deltaOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static java.util.Objects.requireNonNull;

import io.delta.kernel.Snapshot;
import io.delta.kernel.expressions.And;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.spark.snapshot.DeltaSnapshotManager;
Expand All @@ -39,6 +40,7 @@ public class SparkScanBuilder
implements ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownFilters {

private io.delta.kernel.ScanBuilder kernelScanBuilder;
private final Snapshot initialSnapshot;
private final DeltaSnapshotManager snapshotManager;
private final StructType dataSchema;
private final StructType partitionSchema;
Expand Down Expand Up @@ -70,8 +72,8 @@ public SparkScanBuilder(
StructType dataSchema,
StructType partitionSchema,
CaseInsensitiveStringMap options) {
this.kernelScanBuilder =
requireNonNull(initialSnapshot, "initialSnapshot is null").getScanBuilder();
this.initialSnapshot = requireNonNull(initialSnapshot, "initialSnapshot is null");
this.kernelScanBuilder = initialSnapshot.getScanBuilder();
this.snapshotManager = requireNonNull(snapshotManager, "snapshotManager is null");
this.dataSchema = requireNonNull(dataSchema, "dataSchema is null");
this.partitionSchema = requireNonNull(partitionSchema, "partitionSchema is null");
Expand Down Expand Up @@ -158,6 +160,7 @@ public Filter[] pushedFilters() {
public org.apache.spark.sql.connector.read.Scan build() {
return new SparkScan(
snapshotManager,
initialSnapshot,
dataSchema,
partitionSchema,
requiredDataSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.DeltaLogActionUtils;
import io.delta.kernel.internal.TableChangesUtils;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.RemoveFile;
import io.delta.kernel.internal.commitrange.CommitRangeImpl;
import io.delta.kernel.internal.data.StructRow;
import io.delta.kernel.utils.CloseableIterator;
import java.util.Optional;
import java.util.Set;
import org.apache.spark.annotation.Experimental;

/**
Expand Down Expand Up @@ -91,6 +97,32 @@ public static Optional<RemoveFile> getDataChangeRemove(ColumnarBatch batch, int
return removeFile.getDataChange() ? Optional.of(removeFile) : Optional.empty();
}

/**
* Gets actions from a commit range without requiring a snapshot at the exact start version.
*
* <p>This method is "unsafe" because it bypasses the standard {@code CommitRange.getActions()}
* API which requires a snapshot at the exact start version for protocol validation.
*
* <p>This is necessary for streaming scenarios where the start version might not have a
* recreatable snapshot (e.g., after log cleanup) or where {@code startingVersion} is used.
*
* @param engine the Delta engine
* @param commitRange the commit range to read actions from
* @param tablePath the path to the Delta table
* @param actionSet the set of actions to read (e.g., ADD, REMOVE)
* @return an iterator over columnar batches containing the requested actions
*/
public static CloseableIterator<ColumnarBatch> getActionsFromRangeUnsafe(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment explain in what way this is unsafe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Engine engine,
CommitRangeImpl commitRange,
String tablePath,
Set<DeltaLogActionUtils.DeltaAction> actionSet) {
return TableChangesUtils.flattenCommitsAndAddMetadata(
engine,
DeltaLogActionUtils.getActionsFromCommitFilesWithProtocolValidation(
engine, tablePath, commitRange.getDeltaFiles(), actionSet));
}

/** Private constructor to prevent instantiation of this utility class. */
private StreamingHelper() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public void testGetStartingVersion_NoOptions(@TempDir File tempDir) throws Excep
PathBasedSnapshotManager snapshotManager =
new PathBasedSnapshotManager(testTablePath, new Configuration());
SparkMicroBatchStream dsv2Stream =
new SparkMicroBatchStream(snapshotManager, new Configuration());
new SparkMicroBatchStream(
snapshotManager, snapshotManager.loadLatestSnapshot(), new Configuration());
Optional<Long> dsv2Result = dsv2Stream.getStartingVersion();

compareStartingVersionResults(dsv1Result, dsv2Result, Optional.empty(), "No options provided");
Expand Down Expand Up @@ -220,7 +221,11 @@ public void testGetStartingVersion_ProtocolValidationNonFeatureExceptionFallback
new PathBasedSnapshotManager(testTablePath, new Configuration());
SparkMicroBatchStream dsv2Stream =
new SparkMicroBatchStream(
snapshotManager, new Configuration(), spark, createDeltaOptions(startingVersion));
snapshotManager,
snapshotManager.loadLatestSnapshot(),
new Configuration(),
spark,
createDeltaOptions(startingVersion));
Optional<Long> dsv2Result = dsv2Stream.getStartingVersion();

compareStartingVersionResults(
Expand Down Expand Up @@ -259,7 +264,11 @@ private void testAndCompareStartingVersion(
new PathBasedSnapshotManager(testTablePath, new Configuration());
SparkMicroBatchStream dsv2Stream =
new SparkMicroBatchStream(
snapshotManager, new Configuration(), spark, createDeltaOptions(startingVersion));
snapshotManager,
snapshotManager.loadLatestSnapshot(),
new Configuration(),
spark,
createDeltaOptions(startingVersion));
Optional<Long> dsv2Result = dsv2Stream.getStartingVersion();

compareStartingVersionResults(dsv1Result, dsv2Result, expectedVersion, testDescription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ private SparkMicroBatchStream createTestStream(File tempDir) {
PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf);
return new SparkMicroBatchStream(
snapshotManager,
snapshotManager.loadLatestSnapshot(),
hadoopConf,
spark,
new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf()));
Expand Down Expand Up @@ -195,7 +196,9 @@ public void testGetFileChanges(
Configuration hadoopConf = new Configuration();
PathBasedSnapshotManager snapshotManager =
new PathBasedSnapshotManager(testTablePath, hadoopConf);
SparkMicroBatchStream stream = new SparkMicroBatchStream(snapshotManager, hadoopConf);
SparkMicroBatchStream stream =
new SparkMicroBatchStream(
snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf);
Optional<DeltaSourceOffset> endOffsetOption = ScalaUtils.toJavaOptional(scalaEndOffset);
try (CloseableIterator<IndexedFile> kernelChanges =
stream.getFileChanges(fromVersion, fromIndex, isInitialSnapshot, endOffsetOption)) {
Expand Down Expand Up @@ -311,7 +314,9 @@ public void testGetFileChangesWithRateLimit(
Configuration hadoopConf = new Configuration();
PathBasedSnapshotManager snapshotManager =
new PathBasedSnapshotManager(testTablePath, hadoopConf);
SparkMicroBatchStream stream = new SparkMicroBatchStream(snapshotManager, hadoopConf);
SparkMicroBatchStream stream =
new SparkMicroBatchStream(
snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf);
// We need a separate AdmissionLimits object for DSv2 because the method is stateful.
Optional<DeltaSource.AdmissionLimits> dsv2Limits =
createAdmissionLimits(deltaSource, maxFiles, maxBytes);
Expand Down Expand Up @@ -436,7 +441,9 @@ public void testGetFileChanges_EmptyVersions(
Configuration hadoopConf = new Configuration();
PathBasedSnapshotManager snapshotManager =
new PathBasedSnapshotManager(testTablePath, hadoopConf);
SparkMicroBatchStream stream = new SparkMicroBatchStream(snapshotManager, hadoopConf);
SparkMicroBatchStream stream =
new SparkMicroBatchStream(
snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf);
try (CloseableIterator<IndexedFile> kernelChanges =
stream.getFileChanges(
fromVersion, fromIndex, isInitialSnapshot, ScalaUtils.toJavaOptional(endOffset))) {
Expand Down Expand Up @@ -528,7 +535,9 @@ public void testGetFileChanges_OnRemoveFile_throwError(
Configuration hadoopConf = new Configuration();
PathBasedSnapshotManager snapshotManager =
new PathBasedSnapshotManager(testTablePath, hadoopConf);
SparkMicroBatchStream stream = new SparkMicroBatchStream(snapshotManager, hadoopConf);
SparkMicroBatchStream stream =
new SparkMicroBatchStream(
snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf);
UnsupportedOperationException dsv2Exception =
assertThrows(
UnsupportedOperationException.class,
Expand Down Expand Up @@ -633,6 +642,62 @@ private static Stream<Arguments> removeFileScenarios() {
"MERGE: Matched (REMOVE+ADD) and not matched (ADD)"));
}

@Test
public void testGetFileChanges_StartingVersionAfterCheckpointAndLogCleanup(@TempDir File tempDir)
throws Exception {
String testTablePath = tempDir.getAbsolutePath();
String testTableName = "test_checkpoint_cleanup_" + System.nanoTime();
createEmptyTestTable(testTablePath, testTableName);

// Insert 5 versions
for (int i = 1; i <= 5; i++) {
sql("INSERT INTO %s VALUES (%d, 'User%d')", testTableName, i, i);
}

// Create checkpoint at version 5
DeltaLog.forTable(spark, new Path(testTablePath)).checkpoint();

// Delete 0.json to simulate log cleanup
Path logPath = new Path(testTablePath, "_delta_log");
Path logFile0 = new Path(logPath, "00000000000000000000.json");
File file0 = new File(logFile0.toUri().getPath());
if (file0.exists()) {
file0.delete();
}

// Now test with startingVersion=1
PathBasedSnapshotManager snapshotManager =
new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf());
SparkMicroBatchStream stream =
new SparkMicroBatchStream(
snapshotManager,
snapshotManager.loadLatestSnapshot(),
spark.sessionState().newHadoopConf());

// Get file changes from version 1 onwards
try (CloseableIterator<IndexedFile> kernelChanges =
stream.getFileChanges(
/* fromVersion= */ 1L,
/* fromIndex= */ DeltaSourceOffset.BASE_INDEX(),
/* isInitialSnapshot= */ false,
/* endOffset= */ Optional.empty())) {

List<IndexedFile> kernelFilesList = new ArrayList<>();
while (kernelChanges.hasNext()) {
kernelFilesList.add(kernelChanges.next());
}

// Filter to get only actual data files (addFile != null)
long actualFileCount = kernelFilesList.stream().filter(f -> f.getAddFile() != null).count();

// Should be able to read 5 data files from versions 1-5
assertEquals(
5,
actualFileCount,
"Should read 5 data files from versions 1-5 even though version 0 log is deleted");
}
}

// ================================================================================================
// Tests for latestOffset parity between DSv1 and DSv2
// ================================================================================================
Expand Down Expand Up @@ -674,7 +739,9 @@ public void testLatestOffset_NotInitialSnapshot(
Configuration hadoopConf = new Configuration();
PathBasedSnapshotManager snapshotManager =
new PathBasedSnapshotManager(testTablePath, hadoopConf);
SparkMicroBatchStream stream = new SparkMicroBatchStream(snapshotManager, hadoopConf);
SparkMicroBatchStream stream =
new SparkMicroBatchStream(
snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf);
Offset v2EndOffset = stream.latestOffset(startOffset, readLimit);

compareOffsets(v1EndOffset, v2EndOffset, testDescription);
Expand Down Expand Up @@ -792,7 +859,9 @@ public void testLatestOffset_SequentialBatchAdvancement(
Configuration hadoopConf = new Configuration();
PathBasedSnapshotManager snapshotManager =
new PathBasedSnapshotManager(testTablePath, hadoopConf);
SparkMicroBatchStream stream = new SparkMicroBatchStream(snapshotManager, hadoopConf);
SparkMicroBatchStream stream =
new SparkMicroBatchStream(
snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf);
List<Offset> dsv2Offsets =
advanceOffsetSequenceDsv2(stream, startOffset, numIterations, readLimit);

Expand Down Expand Up @@ -917,7 +986,9 @@ public void testLatestOffset_NoNewDataAtLatestVersion(
Configuration hadoopConf = new Configuration();
PathBasedSnapshotManager snapshotManager =
new PathBasedSnapshotManager(testTablePath, hadoopConf);
SparkMicroBatchStream stream = new SparkMicroBatchStream(snapshotManager, hadoopConf);
SparkMicroBatchStream stream =
new SparkMicroBatchStream(
snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf);
Offset dsv2Offset = stream.latestOffset(startOffset, readLimit);

compareOffsets(dsv1Offset, dsv2Offset, testDescription);
Expand Down
Loading