Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel][Metrics][PR#7] Support ScanReport to log metrics for a Scan operation #4068

Merged
merged 10 commits into from
Feb 3, 2025
Prev Previous commit
Next Next commit
Respond to comments
  • Loading branch information
allisonport-db committed Jan 24, 2025
commit 1b4fd86a04b581a2a6cc8fb60eb74a1e3057e7ca
Original file line number Diff line number Diff line change
@@ -134,7 +134,7 @@ public CloseableIterator<FilteredColumnarBatch> getScanFiles(
// `reportError` or `reportSuccess` to stop the planning duration timer and push a report to
// the engine
ScanReportReporter reportReporter =
exceptionOpt -> {
(exceptionOpt, isFullyConsumed) -> {
planningDuration.stop();
ScanReport scanReport =
new ScanReportImpl(
@@ -146,6 +146,7 @@ public CloseableIterator<FilteredColumnarBatch> getScanFiles(
readSchema,
getPartitionsFilters() /* partitionPredicate */,
dataSkippingFilter.map(p -> p),
isFullyConsumed,
scanMetrics,
exceptionOpt);
engine.getMetricsReporters().forEach(reporter -> reporter.report(scanReport));
@@ -359,27 +360,32 @@ private CloseableIterator<FilteredColumnarBatch> applyDataSkipping(
* etc. This means we cannot report the successful scan within `getScanFiles` but rather must
* report after the iterator has been consumed.
*
* <p>This method takes a scan file iterator. It wraps the methods `next` and `hasNext` such that
* in the case of an exception originating from there, we will first report a failed ScanReport
* before propagating the exception. It also reports a success ScanReport when the iterator is
* closed, if and only if, all the elements have been consumed.
* <p>This method wraps an inner scan file iterator with an outer iterator wrapper that reports
* {@link ScanReport}s as needed. It reports a failed {@link ScanReport} in the case of any
* exceptions originating from the inner iterator `next` and `hasNext` impl. It reports a complete
* or incomplete {@link ScanReport} when the iterator is closed.
*/
private CloseableIterator<FilteredColumnarBatch> wrapWithMetricsReporting(
CloseableIterator<FilteredColumnarBatch> scanIter, ScanReportReporter reporter) {
return new CloseableIterator<FilteredColumnarBatch>() {

/* Whether this iterator has reported an error report */
private boolean errorReported = false;

@Override
public void close() throws IOException {
try {
if (!scanIter.hasNext()) {
// The entire scan file iterator has been successfully consumed, report successful Scan
reporter.reportSuccess();
} else {
// TODO what should we do if the iterator is not fully consumed?
// - Do nothing (we don't know why the iterator is being closed early or what is going
// on in the connector)
// - Report a failure report with some sort of placeholder exception
// - Something else?
// If a ScanReport has already been pushed in the case of an exception don't double report
if (!errorReported) {
if (!scanIter.hasNext()) {
// The entire scan file iterator has been successfully consumed report a complete Scan
reporter.reportCompleteScan();
} else {
// The scan file iterator has NOT been fully consumed before being closed
// We have no way of knowing the reason why, this could be due to an exception in the
// connector code, or intentional early termination such as for a LIMIT query
reporter.reportIncompleteScan();
}
}
} finally {
scanIter.close();
@@ -401,6 +407,7 @@ private <T> T wrapWithErrorReporting(Supplier<T> s) {
return s.get();
} catch (Exception e) {
reporter.reportError(e);
errorReported = true;
throw e;
}
}
@@ -414,14 +421,18 @@ private <T> T wrapWithErrorReporting(Supplier<T> s) {
private interface ScanReportReporter {

default void reportError(Exception e) {
report(Optional.of(e));
report(Optional.of(e), false /* isFullyConsumed */);
}

default void reportCompleteScan() {
report(Optional.empty(), true /* isFullyConsumed */);
}

default void reportSuccess() {
report(Optional.empty());
default void reportIncompleteScan() {
report(Optional.empty(), false /* isFullyConsumed */);
}

/** Given an optional exception, reports a {@link ScanReport} to the engine */
void report(Optional<Exception> exceptionOpt);
void report(Optional<Exception> exceptionOpt, boolean isFullyConsumed);
}
}
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ public class ScanReportImpl extends DeltaOperationReportImpl implements ScanRepo
private final StructType readSchema;
private final Optional<Predicate> partitionPredicate;
private final Optional<Predicate> dataSkippingFilter;
private final boolean isFullyConsumed;
private final ScanMetricsResult scanMetricsResult;

public ScanReportImpl(
@@ -45,6 +46,7 @@ public ScanReportImpl(
StructType readSchema,
Optional<Predicate> partitionPredicate,
Optional<Predicate> dataSkippingFilter,
boolean isFullyConsumed,
ScanMetrics scanMetrics,
Optional<Exception> exception) {
super(tablePath, exception);
@@ -55,6 +57,7 @@ public ScanReportImpl(
this.readSchema = requireNonNull(readSchema);
this.partitionPredicate = requireNonNull(partitionPredicate);
this.dataSkippingFilter = requireNonNull(dataSkippingFilter);
this.isFullyConsumed = isFullyConsumed;
this.scanMetricsResult = requireNonNull(scanMetrics).captureScanMetricsResult();
}

@@ -93,6 +96,11 @@ public Optional<Predicate> getDataSkippingFilter() {
return dataSkippingFilter;
}

@Override
public boolean getIsFullyConsumed() {
return isFullyConsumed;
}

@Override
public ScanMetricsResult getScanMetrics() {
return scanMetricsResult;
Original file line number Diff line number Diff line change
@@ -40,19 +40,19 @@ public interface ScanMetricsResult {

/**
* @return the number of AddFile actions seen during log replay (from both checkpoint and delta
* files). For a failed scan this metric may be incomplete.
* files). For a failed or incomplete scan this metric may be incomplete.
*/
long getNumAddFilesSeen();

/**
* @return the number of AddFile actions seen during log replay from delta files only. For a
* failed scan this metric may be incomplete.
* failed or incomplete scan this metric may be incomplete.
*/
long getNumAddFilesSeenFromDeltaFiles();

/**
* @return the number of active AddFile actions that survived log replay (i.e. belong to the table
* state). For a failed scan this metric may be incomplete.
* state). For a failed or incomplete scan this metric may be incomplete.
*/
long getNumActiveAddFiles();

@@ -62,13 +62,13 @@ public interface ScanMetricsResult {
* In this case, the same AddFile will be added with stats without removing the original.
*
* @return the number of AddFile actions seen during log replay that are duplicates. For a failed
* scan this metric may be incomplete.
* or incomplete scan this metric may be incomplete.
*/
long getNumDuplicateAddFiles();

/**
* @return the number of RemoveFiles seen in log replay (only from delta files). For a failed scan
* this metric may be incomplete.
* @return the number of RemoveFiles seen in log replay (only from delta files). For a failed or
* incomplete scan this metric may be incomplete.
*/
long getNumRemoveFilesSeenFromDeltaFiles();
}
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
"readSchema",
"partitionPredicate",
"dataSkippingFilter",
"isFullyConsumed",
"scanMetrics"
})
public interface ScanReport extends DeltaOperationReport {
@@ -63,6 +64,15 @@ public interface ScanReport extends DeltaOperationReport {
/** @return the filter used for data skipping using the file statistics */
Optional<Predicate> getDataSkippingFilter();

/**
* Whether the scan file iterator had been fully consumed when it was closed. The iterator may be
* closed early (before being fully consumed) either due to an exception originating within
* connector code or intentionally (such as for a LIMIT query).
*
* @return whether the scan file iterator had been fully consumed when it was closed
*/
boolean getIsFullyConsumed();

/** @return the metrics for this scan */
ScanMetricsResult getScanMetrics();

Original file line number Diff line number Diff line change
@@ -210,6 +210,7 @@ class MetricsReportSerializerSuite extends AnyFunSuite {
|"readSchema":"${scanReport.getReadSchema}",
|"partitionPredicate":${optionToString(partitionPredicate)},
|"dataSkippingFilter":${optionToString(dataSkippingFilter)},
|"isFullyConsumed":${scanReport.getIsFullyConsumed},
|"scanMetrics":{
|"totalPlanningDurationNs":${scanMetrics.getTotalPlanningDurationNs},
|"numAddFilesSeen":${scanMetrics.getNumAddFilesSeen},
@@ -248,6 +249,7 @@ class MetricsReportSerializerSuite extends AnyFunSuite {
new StructType().add("id", IntegerType.INTEGER),
Optional.of(partitionPredicate),
Optional.empty(),
true,
scanMetrics,
Optional.of(exception)
)
@@ -267,6 +269,7 @@ class MetricsReportSerializerSuite extends AnyFunSuite {
|"readSchema":"struct(StructField(name=id,type=integer,nullable=true,metadata={}))",
|"partitionPredicate":"(column(`part`) > 1)",
|"dataSkippingFilter":null,
|"isFullyConsumed":true,
|"scanMetrics":{
|"totalPlanningDurationNs":200,
|"numAddFilesSeen":100,
@@ -292,6 +295,7 @@ class MetricsReportSerializerSuite extends AnyFunSuite {
tableSchema,
Optional.empty(),
Optional.empty(),
false, // isFullyConsumed
new ScanMetrics(),
Optional.empty()
)
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ package io.delta.kernel.defaults.metrics
import java.util.Collections

import io.delta.kernel._
import io.delta.kernel.data.FilteredColumnarBatch
import io.delta.kernel.engine._
import io.delta.kernel.expressions.{Column, Literal, Predicate}
import io.delta.kernel.internal.data.GenericRow
@@ -26,6 +27,7 @@ import io.delta.kernel.internal.metrics.Timer
import io.delta.kernel.internal.util.{FileNames, Utils}
import io.delta.kernel.metrics.{ScanReport, SnapshotReport}
import io.delta.kernel.types.{IntegerType, LongType, StructType}
import io.delta.kernel.utils.CloseableIterator

import org.apache.spark.sql.functions.col
import org.scalatest.funsuite.AnyFunSuite
@@ -48,15 +50,16 @@ class ScanReportSuite extends AnyFunSuite with MetricsReportTestUtils {
*/
def getScanAndSnapshotReport(
getScan: Engine => Scan,
expectException: Boolean
expectException: Boolean,
consumeScanFiles: CloseableIterator[FilteredColumnarBatch] => Unit
): (ScanReport, Long, SnapshotReport, Option[Exception]) = {
val timer = new Timer()

val (metricsReports, exception) = collectMetricsReports(
engine => {
val scan = getScan(engine)
// toSeq triggers log replay, consumes the actions and closes the iterator
timer.time(() => scan.getScanFiles(engine).toSeq) // Time the actual operation
// Time the actual operation
timer.timeCallable(() => consumeScanFiles(scan.getScanFiles(engine)))
},
expectException
)
@@ -85,6 +88,7 @@ class ScanReportSuite extends AnyFunSuite with MetricsReportTestUtils {
* @param expectedDataSkippingFilter expected data skipping filter
* @param filter filter to build the scan with
* @param readSchema read schema to build the scan with
* @param consumeScanFiles function to consume scan file iterator
*/
// scalastyle:off
def checkScanReport(
@@ -97,8 +101,11 @@ class ScanReportSuite extends AnyFunSuite with MetricsReportTestUtils {
expectedNumRemoveFilesSeenFromDeltaFiles: Long = 0,
expectedPartitionPredicate: Option[Predicate] = None,
expectedDataSkippingFilter: Option[Predicate] = None,
expectedIsFullyConsumed: Boolean = true,
filter: Option[Predicate] = None,
readSchema: Option[StructType] = None
readSchema: Option[StructType] = None,
// toSeq triggers log replay, consumes the actions and closes the iterator
consumeScanFiles: CloseableIterator[FilteredColumnarBatch] => Unit = iter => iter.toSeq
): Unit = {
// scalastyle:on
// We need to save the snapshotSchema to check against the generated scan report
@@ -119,7 +126,8 @@ class ScanReportSuite extends AnyFunSuite with MetricsReportTestUtils {
}
scanBuilder.build()
},
expectException
expectException,
consumeScanFiles
)

// Verify contents
@@ -142,6 +150,7 @@ class ScanReportSuite extends AnyFunSuite with MetricsReportTestUtils {
assert(scanReport.getFilter.toScala == filter)
assert(scanReport.getReadSchema == readSchema.getOrElse(snapshotSchema))
assert(scanReport.getPartitionPredicate.toScala == expectedPartitionPredicate)
assert(scanReport.getIsFullyConsumed == expectedIsFullyConsumed)

(scanReport.getDataSkippingFilter.toScala, expectedDataSkippingFilter) match {
case (Some(found), Some(expected)) =>
@@ -257,6 +266,26 @@ class ScanReportSuite extends AnyFunSuite with MetricsReportTestUtils {
}
}

test("ScanReport: close scan file iterator early") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath

// Set up delta table with 2 add files
spark.range(10).write.format("delta").mode("append").save(path)
spark.range(10).write.format("delta").mode("append").save(path)

checkScanReport(
path,
expectException = false,
expectedNumAddFiles = 1,
expectedNumAddFilesFromDeltaFiles = 1,
expectedNumActiveAddFiles = 1,
expectedIsFullyConsumed = false,
consumeScanFiles = iter => iter.close() // Close iterator before consuming any scan files
)
}
}

//////////////////
// Error cases ///
//////////////////
@@ -280,6 +309,7 @@ class ScanReportSuite extends AnyFunSuite with MetricsReportTestUtils {
expectedNumAddFiles = 0,
expectedNumAddFilesFromDeltaFiles = 0,
expectedNumActiveAddFiles = 0,
expectedIsFullyConsumed = false,
filter = Some(partFilter),
expectedPartitionPredicate = Some(partFilter)
)
@@ -316,7 +346,8 @@ class ScanReportSuite extends AnyFunSuite with MetricsReportTestUtils {
expectException = true,
expectedNumAddFiles = 0,
expectedNumAddFilesFromDeltaFiles = 0,
expectedNumActiveAddFiles = 0
expectedNumActiveAddFiles = 0,
expectedIsFullyConsumed = false
)
}
}
Loading