Skip to content

Commit e872fa2

Browse files
committed
feat: kernel snapshot abstractions
Signed-off-by: Robert Pack <[email protected]>
1 parent cc3e348 commit e872fa2

File tree

14 files changed

+1483
-15
lines changed

14 files changed

+1483
-15
lines changed

.pre-commit-config.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,29 @@
1+
ci:
2+
skip: [cargo-fmt]
3+
14
repos:
25
- repo: https://github.com/crate-ci/typos
36
rev: v1.32.0
47
hooks:
58
- id: typos
9+
10+
- repo: https://github.com/pre-commit/pre-commit-hooks
11+
rev: v5.0.0
12+
hooks:
13+
- id: check-merge-conflict
14+
- id: trailing-whitespace
15+
args: [--markdown-linebreak-ext=md]
16+
17+
- repo: https://github.com/gitleaks/gitleaks
18+
rev: v8.26.0
19+
hooks:
20+
- id: gitleaks
21+
22+
- repo: local
23+
hooks:
24+
- id: cargo-fmt
25+
name: cargo fmt
26+
entry: cargo fmt --
27+
language: system
28+
types: [rust]
29+
pass_filenames: false # This makes it a lot faster

crates/core/src/kernel/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ pub mod error;
1010
pub mod models;
1111
pub mod scalars;
1212
mod snapshot;
13+
#[allow(unused)]
14+
mod snapshot_next;
1315
pub mod transaction;
1416

1517
pub use error::*;

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::io::{BufRead, BufReader, Cursor};
2121
use ::serde::{Deserialize, Serialize};
2222
use arrow_array::RecordBatch;
2323
use delta_kernel::path::{LogPathFileType, ParsedLogPath};
24+
use delta_kernel::Version;
2425
use futures::stream::BoxStream;
2526
use futures::{StreamExt, TryStreamExt};
2627
use object_store::path::Path;
@@ -241,7 +242,7 @@ impl Snapshot {
241242
&self,
242243
log_store: &dyn LogStore,
243244
limit: Option<usize>,
244-
) -> DeltaResult<BoxStream<'_, DeltaResult<Option<CommitInfo>>>> {
245+
) -> DeltaResult<BoxStream<'static, DeltaResult<Option<(Version, CommitInfo)>>>> {
245246
let store = log_store.root_object_store(None);
246247

247248
let log_root = self.table_root_path()?.child("_delta_log");
@@ -266,21 +267,24 @@ impl Snapshot {
266267
let dummy_path = dummy_url.join(meta.location.as_ref()).unwrap();
267268
if let Some(parsed_path) = ParsedLogPath::try_from(dummy_path)? {
268269
if matches!(parsed_path.file_type, LogPathFileType::Commit) {
269-
commit_files.push(meta);
270+
commit_files.push((meta, parsed_path));
270271
}
271272
}
272273
}
273-
commit_files.sort_unstable_by(|a, b| b.location.cmp(&a.location));
274+
commit_files.sort_unstable_by(|a, b| b.0.location.cmp(&a.0.location));
274275
Ok(futures::stream::iter(commit_files)
275-
.map(move |meta| {
276+
.map(move |(meta, parsed_path)| {
276277
let store = store.clone();
277278
async move {
278279
let commit_log_bytes = store.get(&meta.location).await?.bytes().await?;
279280
let reader = BufReader::new(Cursor::new(commit_log_bytes));
280281
for line in reader.lines() {
281282
let action: Action = serde_json::from_str(line?.as_str())?;
282283
if let Action::CommitInfo(commit_info) = action {
283-
return Ok::<_, DeltaTableError>(Some(commit_info));
284+
return Ok::<_, DeltaTableError>(Some((
285+
parsed_path.version,
286+
commit_info,
287+
)));
284288
}
285289
}
286290
Ok(None)
@@ -336,7 +340,7 @@ impl Snapshot {
336340
/// A snapshot of a Delta table that has been eagerly loaded into memory.
337341
#[derive(Debug, Clone, PartialEq)]
338342
pub struct EagerSnapshot {
339-
snapshot: Snapshot,
343+
pub(crate) snapshot: Snapshot,
340344
// additional actions that should be tracked during log replay.
341345
tracked_actions: HashSet<ActionType>,
342346

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
//! Utilities for interacting with Kernel APIs using Arrow data structures.
2+
//!
3+
use delta_kernel::arrow::array::BooleanArray;
4+
use delta_kernel::arrow::compute::filter_record_batch;
5+
use delta_kernel::arrow::record_batch::RecordBatch;
6+
use delta_kernel::engine::arrow_data::ArrowEngineData;
7+
use delta_kernel::scan::{Scan, ScanMetadata};
8+
use delta_kernel::{
9+
DeltaResult, Engine, EngineData, ExpressionEvaluator, ExpressionRef, PredicateRef, Version,
10+
};
11+
use itertools::Itertools;
12+
13+
/// [`ScanMetadata`] contains (1) a [`RecordBatch`] specifying data files to be scanned
14+
/// and (2) a vector of transforms (one transform per scan file) that must be applied to the data read
15+
/// from those files.
16+
pub struct ScanMetadataArrow {
17+
/// Record batch with one row per file to scan
18+
pub scan_files: RecordBatch,
19+
20+
/// Row-level transformations to apply to data read from files.
21+
///
22+
/// Each entry in this vector corresponds to a row in the `scan_files` data. The entry is an
23+
/// expression that must be applied to convert the file's data into the logical schema
24+
/// expected by the scan:
25+
///
26+
/// - `Some(expr)`: Apply this expression to transform the data to match [`Scan::schema()`].
27+
/// - `None`: No transformation is needed; the data is already in the correct logical form.
28+
///
29+
/// Note: This vector can be indexed by row number.
30+
pub scan_file_transforms: Vec<Option<ExpressionRef>>,
31+
}
32+
33+
pub trait ScanExt {
34+
/// Get the metadata for a table scan.
35+
///
36+
/// This method handles translation between `EngineData` and `RecordBatch`
37+
/// and will already apply any selection vectors to the data.
38+
/// See [`Scan::scan_metadata`] for details.
39+
fn scan_metadata_arrow(
40+
&self,
41+
engine: &dyn Engine,
42+
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>>;
43+
44+
fn scan_metadata_from_arrow(
45+
&self,
46+
engine: &dyn Engine,
47+
existing_version: Version,
48+
existing_data: Box<dyn Iterator<Item = RecordBatch>>,
49+
existing_predicate: Option<PredicateRef>,
50+
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>>;
51+
}
52+
53+
impl ScanExt for Scan {
54+
fn scan_metadata_arrow(
55+
&self,
56+
engine: &dyn Engine,
57+
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>> {
58+
Ok(self
59+
.scan_metadata(engine)?
60+
.map_ok(kernel_to_arrow)
61+
.flatten())
62+
}
63+
64+
fn scan_metadata_from_arrow(
65+
&self,
66+
engine: &dyn Engine,
67+
existing_version: Version,
68+
existing_data: Box<dyn Iterator<Item = RecordBatch>>,
69+
existing_predicate: Option<PredicateRef>,
70+
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>> {
71+
let engine_iter =
72+
existing_data.map(|batch| Box::new(ArrowEngineData::new(batch)) as Box<dyn EngineData>);
73+
Ok(self
74+
.scan_metadata_from(engine, existing_version, engine_iter, existing_predicate)?
75+
.map_ok(kernel_to_arrow)
76+
.flatten())
77+
}
78+
}
79+
80+
fn kernel_to_arrow(metadata: ScanMetadata) -> DeltaResult<ScanMetadataArrow> {
81+
let scan_file_transforms = metadata
82+
.scan_file_transforms
83+
.into_iter()
84+
.enumerate()
85+
.filter_map(|(i, v)| metadata.scan_files.selection_vector[i].then_some(v))
86+
.collect();
87+
let batch = ArrowEngineData::try_from_engine_data(metadata.scan_files.data)?.into();
88+
let scan_files = filter_record_batch(
89+
&batch,
90+
&BooleanArray::from(metadata.scan_files.selection_vector),
91+
)?;
92+
Ok(ScanMetadataArrow {
93+
scan_files,
94+
scan_file_transforms,
95+
})
96+
}
97+
98+
pub trait ExpressionEvaluatorExt {
99+
fn evaluate_arrow(&self, batch: RecordBatch) -> DeltaResult<RecordBatch>;
100+
}
101+
102+
impl<T: ExpressionEvaluator + ?Sized> ExpressionEvaluatorExt for T {
103+
fn evaluate_arrow(&self, batch: RecordBatch) -> DeltaResult<RecordBatch> {
104+
let engine_data = ArrowEngineData::new(batch);
105+
Ok(ArrowEngineData::try_from_engine_data(T::evaluate(self, &engine_data)?)?.into())
106+
}
107+
}
108+
109+
#[cfg(test)]
110+
mod tests {
111+
use std::sync::Arc;
112+
113+
use super::ExpressionEvaluatorExt;
114+
115+
use delta_kernel::arrow::array::Int32Array;
116+
use delta_kernel::arrow::datatypes::{DataType, Field, Schema};
117+
use delta_kernel::arrow::record_batch::RecordBatch;
118+
use delta_kernel::engine::arrow_conversion::TryIntoKernel;
119+
use delta_kernel::engine::arrow_expression::ArrowEvaluationHandler;
120+
use delta_kernel::expressions::*;
121+
use delta_kernel::EvaluationHandler;
122+
123+
#[test]
124+
fn test_evaluate_arrow() {
125+
let handler = ArrowEvaluationHandler;
126+
127+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
128+
let values = Int32Array::from(vec![1, 2, 3]);
129+
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(values)]).unwrap();
130+
131+
let expression = column_expr!("a");
132+
let expr = handler.new_expression_evaluator(
133+
Arc::new((&schema).try_into_kernel().unwrap()),
134+
expression,
135+
delta_kernel::schema::DataType::INTEGER,
136+
);
137+
138+
let result = expr.evaluate_arrow(batch);
139+
assert!(result.is_ok());
140+
}
141+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
use std::sync::Arc;
2+
3+
use arrow::compute::concat_batches;
4+
use arrow_array::RecordBatch;
5+
use arrow_schema::SchemaRef as ArrowSchemaRef;
6+
use delta_kernel::actions::{Metadata, Protocol};
7+
use delta_kernel::engine::arrow_conversion::TryIntoArrow;
8+
use delta_kernel::scan::scan_row_schema;
9+
use delta_kernel::schema::Schema;
10+
use delta_kernel::table_properties::TableProperties;
11+
use delta_kernel::{PredicateRef, Table, Version};
12+
use futures::TryStreamExt;
13+
use itertools::Itertools;
14+
use object_store::ObjectStore;
15+
use url::Url;
16+
use uuid::Uuid;
17+
18+
use super::arrow_ext::ScanExt;
19+
use super::lazy::LazySnapshot;
20+
use super::stream::SendableRBStream;
21+
use super::{LogStoreHandler, Snapshot};
22+
use crate::kernel::CommitInfo;
23+
use crate::logstore::LogStore;
24+
use crate::{DeltaResult, DeltaTableConfig};
25+
26+
/// An eager snapshot of a Delta Table at a specific version.
27+
///
28+
/// This snapshot loads some log data eagerly and keeps it in memory.
29+
#[derive(Clone)]
30+
pub struct EagerSnapshot {
31+
snapshot: LazySnapshot,
32+
files: RecordBatch,
33+
predicate: Option<PredicateRef>,
34+
}
35+
36+
#[async_trait::async_trait]
37+
impl Snapshot for EagerSnapshot {
38+
fn version(&self) -> Version {
39+
self.snapshot.version()
40+
}
41+
42+
fn schema(&self) -> ArrowSchemaRef {
43+
self.snapshot.schema()
44+
}
45+
46+
fn table_properties(&self) -> &TableProperties {
47+
self.snapshot.table_properties()
48+
}
49+
50+
fn logical_files(
51+
&self,
52+
log_store: &LogStoreHandler,
53+
predicate: Option<PredicateRef>,
54+
) -> SendableRBStream {
55+
if let Some(predicate) = predicate {
56+
self.snapshot.logical_files(log_store, Some(predicate))
57+
} else {
58+
let batch = self.files.clone();
59+
return Box::pin(futures::stream::once(async move { Ok(batch) }));
60+
}
61+
}
62+
63+
async fn application_transaction_version(
64+
&self,
65+
log_store: &LogStoreHandler,
66+
app_id: String,
67+
) -> DeltaResult<Option<i64>> {
68+
self.snapshot
69+
.application_transaction_version(log_store, app_id)
70+
.await
71+
}
72+
73+
async fn commit_infos(
74+
&self,
75+
log_store: &LogStoreHandler,
76+
start_version: Option<Version>,
77+
limit: Option<usize>,
78+
) -> DeltaResult<Vec<(Version, CommitInfo)>> {
79+
self.snapshot
80+
.commit_infos(log_store, start_version, limit)
81+
.await
82+
}
83+
84+
async fn update(
85+
&mut self,
86+
log_store: &LogStoreHandler,
87+
target_version: Option<Version>,
88+
) -> DeltaResult<bool> {
89+
self.update_impl(log_store, target_version).await
90+
}
91+
}
92+
93+
impl EagerSnapshot {
94+
/// Create a new [`EagerSnapshot`] instance
95+
pub(crate) async fn try_new(
96+
log_store: &LogStoreHandler,
97+
config: DeltaTableConfig,
98+
version: impl Into<Option<Version>>,
99+
predicate: Option<PredicateRef>,
100+
) -> DeltaResult<Self> {
101+
let snapshot = LazySnapshot::try_new(log_store, version).await?;
102+
let all: Vec<_> = snapshot
103+
.logical_files(log_store, predicate.clone())
104+
.try_collect()
105+
.await?;
106+
let files = if all.is_empty() {
107+
RecordBatch::new_empty(Arc::new((scan_row_schema().as_ref()).try_into_arrow()?))
108+
} else {
109+
concat_batches(&all[0].schema(), &all)?
110+
};
111+
Ok(Self {
112+
snapshot,
113+
files,
114+
predicate,
115+
})
116+
}
117+
118+
/// Get the number of files in the current snapshot
119+
pub fn files_count(&self) -> usize {
120+
self.files.num_rows()
121+
}
122+
123+
async fn update_impl(
124+
&mut self,
125+
log_store: &LogStoreHandler,
126+
target_version: Option<Version>,
127+
) -> DeltaResult<bool> {
128+
let mut snapshot = self.snapshot.clone();
129+
let current = snapshot.version();
130+
if !snapshot.update(log_store, target_version.clone()).await? {
131+
return Ok(false);
132+
}
133+
134+
let scan = snapshot.inner.clone().scan_builder().build()?;
135+
let engine = log_store.engine();
136+
// TODO: process blocking iterator
137+
let files: Vec<_> = scan
138+
.scan_metadata_from_arrow(
139+
engine.as_ref(),
140+
current,
141+
Box::new(std::iter::once(self.files.clone())),
142+
self.predicate.clone(),
143+
)?
144+
.map_ok(|s| s.scan_files)
145+
.try_collect()?;
146+
147+
self.files = concat_batches(&files[0].schema(), &files)?;
148+
self.snapshot = snapshot;
149+
150+
Ok(true)
151+
}
152+
}

0 commit comments

Comments
 (0)