Skip to content

Commit fffab44

Browse files
committed
feat: kernel snapshot abstractions
Signed-off-by: Robert Pack <[email protected]>
1 parent ecf6ebc commit fffab44

File tree

9 files changed

+1023
-1
lines changed

9 files changed

+1023
-1
lines changed

crates/core/src/kernel/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ pub mod error;
1010
pub mod models;
1111
pub mod scalars;
1212
mod snapshot;
13+
#[allow(unused)]
14+
mod snapshot_next;
1315
pub mod transaction;
1416

1517
pub use error::*;

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ impl Snapshot {
336336
/// A snapshot of a Delta table that has been eagerly loaded into memory.
337337
#[derive(Debug, Clone, PartialEq)]
338338
pub struct EagerSnapshot {
339-
snapshot: Snapshot,
339+
pub(crate) snapshot: Snapshot,
340340
// additional actions that should be tracked during log replay.
341341
tracked_actions: HashSet<ActionType>,
342342

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
use delta_kernel::arrow::array::BooleanArray;
2+
use delta_kernel::arrow::compute::filter_record_batch;
3+
use delta_kernel::arrow::record_batch::RecordBatch;
4+
use delta_kernel::engine::arrow_data::ArrowEngineData;
5+
use delta_kernel::scan::{Scan, ScanMetadata};
6+
use delta_kernel::{
7+
DeltaResult, Engine, EngineData, ExpressionEvaluator, ExpressionRef, PredicateRef, Version,
8+
};
9+
use itertools::Itertools;
10+
11+
/// [`ScanMetadata`] contains (1) a [`RecordBatch`] specifying data files to be scanned
12+
/// and (2) a vector of transforms (one transform per scan file) that must be applied to the data read
13+
/// from those files.
14+
pub struct ScanMetadataArrow {
15+
/// Record batch with one row per file to scan
16+
pub scan_files: RecordBatch,
17+
18+
/// Row-level transformations to apply to data read from files.
19+
///
20+
/// Each entry in this vector corresponds to a row in the `scan_files` data. The entry is an
21+
/// expression that must be applied to convert the file's data into the logical schema
22+
/// expected by the scan:
23+
///
24+
/// - `Some(expr)`: Apply this expression to transform the data to match [`Scan::schema()`].
25+
/// - `None`: No transformation is needed; the data is already in the correct logical form.
26+
///
27+
/// Note: This vector can be indexed by row number.
28+
pub scan_file_transforms: Vec<Option<ExpressionRef>>,
29+
}
30+
31+
pub trait ScanExt {
32+
fn scan_metadata_arrow(
33+
&self,
34+
engine: &dyn Engine,
35+
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>>;
36+
37+
fn scan_metadata_from_arrow(
38+
&self,
39+
engine: &dyn Engine,
40+
existing_version: Version,
41+
existing_data: Box<dyn Iterator<Item = RecordBatch>>,
42+
existing_predicate: Option<PredicateRef>,
43+
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>>;
44+
}
45+
46+
impl ScanExt for Scan {
47+
fn scan_metadata_arrow(
48+
&self,
49+
engine: &dyn Engine,
50+
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>> {
51+
Ok(self
52+
.scan_metadata(engine)?
53+
.map_ok(kernel_to_arrow)
54+
.flatten())
55+
}
56+
57+
fn scan_metadata_from_arrow(
58+
&self,
59+
engine: &dyn Engine,
60+
existing_version: Version,
61+
existing_data: Box<dyn Iterator<Item = RecordBatch>>,
62+
existing_predicate: Option<PredicateRef>,
63+
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>> {
64+
let engine_iter =
65+
existing_data.map(|batch| Box::new(ArrowEngineData::new(batch)) as Box<dyn EngineData>);
66+
Ok(self
67+
.scan_metadata_from(engine, existing_version, engine_iter, existing_predicate)?
68+
.map_ok(kernel_to_arrow)
69+
.flatten())
70+
}
71+
}
72+
73+
fn kernel_to_arrow(metadata: ScanMetadata) -> DeltaResult<ScanMetadataArrow> {
74+
let scan_file_transforms = metadata
75+
.scan_file_transforms
76+
.into_iter()
77+
.enumerate()
78+
.filter_map(|(i, v)| metadata.scan_files.selection_vector[i].then_some(v))
79+
.collect();
80+
let batch = ArrowEngineData::try_from_engine_data(metadata.scan_files.data)?.into();
81+
let scan_files = filter_record_batch(
82+
&batch,
83+
&BooleanArray::from(metadata.scan_files.selection_vector),
84+
)?;
85+
Ok(ScanMetadataArrow {
86+
scan_files,
87+
scan_file_transforms,
88+
})
89+
}
90+
91+
pub trait ExpressionEvaluatorExt {
92+
fn evaluate_arrow(&self, batch: RecordBatch) -> DeltaResult<RecordBatch>;
93+
}
94+
95+
impl<T: ExpressionEvaluator + ?Sized> ExpressionEvaluatorExt for T {
96+
fn evaluate_arrow(&self, batch: RecordBatch) -> DeltaResult<RecordBatch> {
97+
let engine_data = ArrowEngineData::new(batch);
98+
Ok(ArrowEngineData::try_from_engine_data(T::evaluate(self, &engine_data)?)?.into())
99+
}
100+
}
101+
102+
#[cfg(test)]
103+
mod tests {
104+
use std::sync::Arc;
105+
106+
use super::ExpressionEvaluatorExt;
107+
108+
use delta_kernel::arrow::array::Int32Array;
109+
use delta_kernel::arrow::datatypes::{DataType, Field, Schema};
110+
use delta_kernel::arrow::record_batch::RecordBatch;
111+
use delta_kernel::engine::arrow_conversion::TryIntoKernel;
112+
use delta_kernel::engine::arrow_expression::ArrowEvaluationHandler;
113+
use delta_kernel::expressions::*;
114+
use delta_kernel::EvaluationHandler;
115+
116+
#[test]
117+
fn test_evaluate_arrow() {
118+
let handler = ArrowEvaluationHandler;
119+
120+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
121+
let values = Int32Array::from(vec![1, 2, 3]);
122+
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(values)]).unwrap();
123+
124+
let expression = column_expr!("a");
125+
let expr = handler.new_expression_evaluator(
126+
Arc::new((&schema).try_into_kernel().unwrap()),
127+
expression,
128+
delta_kernel::schema::DataType::INTEGER,
129+
);
130+
131+
let result = expr.evaluate_arrow(batch);
132+
assert!(result.is_ok());
133+
}
134+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
use std::sync::Arc;
2+
3+
use arrow::compute::concat_batches;
4+
use arrow_array::RecordBatch;
5+
use delta_kernel::actions::{Metadata, Protocol};
6+
use delta_kernel::engine::arrow_conversion::TryIntoArrow;
7+
use delta_kernel::scan::scan_row_schema;
8+
use delta_kernel::schema::Schema;
9+
use delta_kernel::table_properties::TableProperties;
10+
use delta_kernel::{PredicateRef, Table, Version};
11+
use itertools::Itertools;
12+
use object_store::ObjectStore;
13+
use url::Url;
14+
use uuid::Uuid;
15+
16+
use super::arrow_ext::ScanExt;
17+
use super::lazy::LazySnapshot;
18+
use super::Snapshot;
19+
use crate::kernel::CommitInfo;
20+
use crate::logstore::LogStore;
21+
use crate::{DeltaResult, DeltaTableConfig};
22+
23+
/// An eager snapshot of a Delta Table at a specific version.
24+
///
25+
/// This snapshot loads some log data eagerly and keeps it in memory.
26+
#[derive(Clone)]
27+
pub struct EagerSnapshot {
28+
snapshot: LazySnapshot,
29+
files: RecordBatch,
30+
predicate: Option<PredicateRef>,
31+
}
32+
33+
#[async_trait::async_trait]
34+
impl Snapshot for EagerSnapshot {
35+
fn table_root(&self) -> &Url {
36+
self.snapshot.table_root()
37+
}
38+
39+
fn version(&self) -> Version {
40+
self.snapshot.version()
41+
}
42+
43+
fn schema(&self) -> Arc<Schema> {
44+
self.snapshot.schema()
45+
}
46+
47+
fn table_properties(&self) -> &TableProperties {
48+
self.snapshot.table_properties()
49+
}
50+
51+
async fn logical_files(
52+
&self,
53+
predicate: Option<PredicateRef>,
54+
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<RecordBatch>> + '_>> {
55+
let snapshot = self.snapshot.inner.clone();
56+
let scan = snapshot.scan_builder().with_predicate(predicate).build()?;
57+
let iter: Vec<_> = scan
58+
.scan_metadata_from_arrow(
59+
self.snapshot.engine_ref().as_ref(),
60+
self.version(),
61+
Box::new(std::iter::once(self.files.clone())),
62+
self.predicate.clone(),
63+
)?
64+
.collect();
65+
Ok(Box::new(iter.into_iter().map(|sc| Ok(sc?.scan_files))))
66+
}
67+
68+
async fn application_transaction_version(&self, app_id: String) -> DeltaResult<Option<i64>> {
69+
self.snapshot.application_transaction_version(app_id).await
70+
}
71+
72+
fn commit_infos(
73+
&self,
74+
start_version: Option<Version>,
75+
limit: Option<usize>,
76+
) -> DeltaResult<Box<dyn Iterator<Item = (Version, CommitInfo)>>> {
77+
self.snapshot.commit_infos(start_version, limit)
78+
}
79+
80+
async fn update(&mut self, target_version: Option<Version>) -> DeltaResult<bool> {
81+
self.update_impl(target_version).await
82+
}
83+
}
84+
85+
impl EagerSnapshot {
86+
/// Create a new [`EagerSnapshot`] instance
87+
pub(crate) async fn try_new(
88+
log_store: Arc<dyn LogStore>,
89+
config: DeltaTableConfig,
90+
version: impl Into<Option<Version>>,
91+
predicate: Option<PredicateRef>,
92+
operation_id: impl Into<Option<Uuid>>,
93+
) -> DeltaResult<Self> {
94+
let snapshot = LazySnapshot::try_new(log_store, version, operation_id).await?;
95+
let all: Vec<_> = snapshot
96+
.logical_files(predicate.clone())
97+
.await?
98+
.try_collect()?;
99+
let files = if all.is_empty() {
100+
RecordBatch::new_empty(Arc::new((scan_row_schema().as_ref()).try_into_arrow()?))
101+
} else {
102+
concat_batches(&all[0].schema(), &all)?
103+
};
104+
Ok(Self {
105+
snapshot,
106+
files,
107+
predicate,
108+
})
109+
}
110+
111+
/// Get the number of files in the current snapshot
112+
pub fn files_count(&self) -> usize {
113+
self.files.num_rows()
114+
}
115+
116+
async fn update_impl(&mut self, target_version: Option<Version>) -> DeltaResult<bool> {
117+
let mut snapshot = self.snapshot.clone();
118+
let current = snapshot.version();
119+
if !snapshot.update(target_version.clone()).await? {
120+
return Ok(false);
121+
}
122+
123+
let scan = snapshot.inner.clone().scan_builder().build()?;
124+
let engine = snapshot.engine_ref().clone();
125+
// TODO: process blocking iterator
126+
let files: Vec<_> = scan
127+
.scan_metadata_from_arrow(
128+
engine.as_ref(),
129+
current,
130+
Box::new(std::iter::once(self.files.clone())),
131+
self.predicate.clone(),
132+
)?
133+
.map_ok(|s| s.scan_files)
134+
.try_collect()?;
135+
136+
self.files = concat_batches(&files[0].schema(), &files)?;
137+
self.snapshot = snapshot;
138+
139+
Ok(true)
140+
}
141+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use arrow_array::cast::AsArray;
2+
use arrow_array::types::Int64Type;
3+
use arrow_array::{Array, RecordBatch};
4+
use chrono::{DateTime, Utc};
5+
use delta_kernel::expressions::{Scalar, StructData};
6+
7+
use crate::kernel::scalars::ScalarExt;
8+
use crate::{DeltaResult, DeltaTableError};
9+
10+
#[derive(Clone)]
11+
pub struct LogicalFileView {
12+
pub(super) files: RecordBatch,
13+
pub(super) index: usize,
14+
}
15+
16+
impl LogicalFileView {
17+
/// Path of the file.
18+
pub fn path(&self) -> &str {
19+
self.files.column(0).as_string::<i32>().value(self.index)
20+
}
21+
22+
/// Size of the file in bytes.
23+
pub fn size(&self) -> i64 {
24+
self.files
25+
.column(1)
26+
.as_primitive::<Int64Type>()
27+
.value(self.index)
28+
}
29+
30+
/// Modification time of the file in milliseconds since epoch.
31+
pub fn modification_time(&self) -> i64 {
32+
self.files
33+
.column(2)
34+
.as_primitive::<Int64Type>()
35+
.value(self.index)
36+
}
37+
38+
/// Datetime of the last modification time of the file.
39+
pub fn modification_datetime(&self) -> DeltaResult<chrono::DateTime<Utc>> {
40+
DateTime::from_timestamp_millis(self.modification_time()).ok_or(
41+
DeltaTableError::MetadataError(format!(
42+
"invalid modification_time: {:?}",
43+
self.modification_time()
44+
)),
45+
)
46+
}
47+
48+
pub fn stats(&self) -> Option<&str> {
49+
let col = self.files.column(3).as_string::<i32>();
50+
col.is_valid(self.index).then(|| col.value(self.index))
51+
}
52+
53+
pub fn partition_values(&self) -> Option<StructData> {
54+
self.files
55+
.column_by_name("fileConstantValues")
56+
.and_then(|col| col.as_struct_opt())
57+
.and_then(|s| s.column_by_name("partitionValues"))
58+
.and_then(|arr| {
59+
arr.is_valid(self.index)
60+
.then(|| match Scalar::from_array(arr, self.index) {
61+
Some(Scalar::Struct(s)) => Some(s),
62+
_ => None,
63+
})
64+
.flatten()
65+
})
66+
}
67+
}

0 commit comments

Comments
 (0)