Skip to content

Commit d59867e

Browse files
committed
test: more snapshot tests
Signed-off-by: Robert Pack <[email protected]>
1 parent fe62dc6 commit d59867e

File tree

6 files changed

+171
-29
lines changed

6 files changed

+171
-29
lines changed

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ debug = "line-tables-only"
2727

2828
[workspace.dependencies]
2929
# delta_kernel = { version = "=0.6.0", features = ["default-engine"] }
30-
delta_kernel = { path = "../delta-kernel-rs/kernel", features = [
31-
"default-engine",
32-
"developer-visibility",
33-
] }
34-
# delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "2e09bfcc0447283a3acc320ad2350f4075dba83e", features = [
30+
# delta_kernel = { path = "../delta-kernel-rs/kernel", features = [
3531
# "default-engine",
3632
# "developer-visibility",
3733
# ] }
34+
delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "50c1c023b7e9d60df69f6e592b91e4cc06a5a0b1", features = [
35+
"default-engine",
36+
"developer-visibility",
37+
] }
3838

3939
# arrow
4040
arrow = { version = "53" }

crates/core/src/kernel/snapshot_next/eager.rs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
use std::sync::Arc;
22

3-
use arrow_array::RecordBatch;
3+
use arrow_array::{BooleanArray, RecordBatch};
4+
use arrow_select::filter::filter_record_batch;
45
use delta_kernel::actions::set_transaction::SetTransactionMap;
56
use delta_kernel::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME};
67
use delta_kernel::actions::{Add, Metadata, Protocol, SetTransaction};
78
use delta_kernel::engine::arrow_data::ArrowEngineData;
89
use delta_kernel::log_segment::LogSegment;
10+
use delta_kernel::scan::log_replay::scan_action_iter;
911
use delta_kernel::schema::Schema;
1012
use delta_kernel::table_properties::TableProperties;
11-
use delta_kernel::{ExpressionRef, Table, Version};
13+
use delta_kernel::{EngineData, ExpressionRef, Table, Version};
1214
use itertools::Itertools;
1315
use object_store::ObjectStore;
1416
use url::Url;
@@ -55,9 +57,36 @@ impl Snapshot for EagerSnapshot {
5557

5658
fn logical_files(
5759
&self,
58-
_predicate: Option<ExpressionRef>,
60+
predicate: Option<ExpressionRef>,
5961
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<RecordBatch>>>> {
60-
todo!()
62+
let scan = self
63+
.snapshot
64+
.inner
65+
.as_ref()
66+
.clone()
67+
.into_scan_builder()
68+
.with_predicate(predicate)
69+
.build()?;
70+
71+
let iter = scan_action_iter(
72+
self.snapshot.engine_ref().as_ref(),
73+
vec![Ok((
74+
Box::new(ArrowEngineData::new(self.file_data()?.clone())) as Box<dyn EngineData>,
75+
false,
76+
))]
77+
.into_iter(),
78+
scan.physical_predicate()
79+
.map(|p| (p, scan.schema().clone())),
80+
)
81+
.map(|res| {
82+
res.and_then(|(data, predicate)| {
83+
let batch: RecordBatch = ArrowEngineData::try_from_engine_data(data)?.into();
84+
Ok(filter_record_batch(&batch, &BooleanArray::from(predicate))?)
85+
})
86+
})
87+
.map(|batch| batch.map_err(|e| e.into()));
88+
89+
Ok(Box::new(iter))
6190
}
6291

6392
fn files(

crates/core/src/kernel/snapshot_next/iterators.rs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::collections::HashSet;
2-
use std::sync::Arc;
2+
use std::sync::{Arc, LazyLock};
33

44
use arrow_array::cast::AsArray;
55
use arrow_array::types::Int64Type;
@@ -14,6 +14,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
1414
use delta_kernel::engine::arrow_expression::ProvidesColumnByName;
1515
use delta_kernel::engine_data::{GetData, RowVisitor};
1616
use delta_kernel::expressions::{Scalar, StructData};
17+
use delta_kernel::scan::scan_row_schema;
1718

1819
use crate::kernel::scalars::ScalarExt;
1920
use crate::{DeltaResult, DeltaTableError};
@@ -218,6 +219,69 @@ impl Iterator for LogicalFileView {
218219
}
219220
}
220221

222+
pub struct LogicalFileViewIterator<I>
223+
where
224+
I: IntoIterator<Item = Result<RecordBatch, DeltaTableError>>,
225+
{
226+
inner: I::IntoIter,
227+
batch: Option<RecordBatch>,
228+
current: usize,
229+
}
230+
231+
impl<I> LogicalFileViewIterator<I>
232+
where
233+
I: IntoIterator<Item = Result<RecordBatch, DeltaTableError>>,
234+
{
235+
/// Create a new [LogicalFileViewIterator].
236+
///
237+
/// If `iter` is an infallible iterator, use `.map(Ok)`.
238+
pub fn new(iter: I) -> Self {
239+
Self {
240+
inner: iter.into_iter(),
241+
batch: None,
242+
current: 0,
243+
}
244+
}
245+
}
246+
247+
// impl<I> Iterator for LogicalFileViewIterator<I>
248+
// where
249+
// I: IntoIterator<Item = DeltaResult<RecordBatch>>,
250+
// {
251+
// type Item = DeltaResult<LogicalFileView>;
252+
//
253+
// fn next(&mut self) -> Option<Self::Item> {
254+
// if let Some(batch) = &self.batch {
255+
// if self.current < batch.num_rows() {
256+
// let item = LogicalFileView {
257+
// files: batch.clone(),
258+
// index: self.current,
259+
// };
260+
// self.current += 1;
261+
// return Some(Ok(item));
262+
// }
263+
// }
264+
// match self.inner.next() {
265+
// Some(Ok(batch)) => {
266+
// if validate_logical_file(&batch).is_err() {
267+
// return Some(Err(DeltaTableError::generic(
268+
// "Invalid logical file data encountered.",
269+
// )));
270+
// }
271+
// self.batch = Some(batch);
272+
// self.current = 0;
273+
// self.next()
274+
// }
275+
// Some(Err(e)) => Some(Err(e)),
276+
// None => None,
277+
// }
278+
// }
279+
//
280+
// fn size_hint(&self) -> (usize, Option<usize>) {
281+
// self.inner.size_hint()
282+
// }
283+
// }
284+
221285
pub struct AddViewIterator<I>
222286
where
223287
I: IntoIterator<Item = Result<RecordBatch, DeltaTableError>>,

crates/core/src/kernel/snapshot_next/lazy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
use std::io::{BufRead, BufReader, Cursor};
44
use std::sync::{Arc, LazyLock};
55

6-
use arrow::compute::filter_record_batch;
76
use arrow_array::{BooleanArray, RecordBatch};
7+
use arrow_select::filter::filter_record_batch;
88
use delta_kernel::actions::set_transaction::{SetTransactionMap, SetTransactionScanner};
99
use delta_kernel::actions::{get_log_schema, REMOVE_NAME};
1010
use delta_kernel::actions::{Metadata, Protocol, SetTransaction};

crates/core/src/kernel/snapshot_next/mod.rs

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
1313
use delta_kernel::engine::arrow_expression::apply_schema;
1414
use delta_kernel::expressions::{Scalar, StructData};
1515
use delta_kernel::scan::log_replay::scan_action_iter;
16+
use delta_kernel::scan::scan_row_schema;
1617
use delta_kernel::schema::{DataType, Schema};
1718
use delta_kernel::table_properties::TableProperties;
1819
use delta_kernel::{EngineData, ExpressionRef, Version};
@@ -92,19 +93,34 @@ pub trait Snapshot {
9293
/// Get the [`TableProperties`] for this [`Snapshot`].
9394
fn table_properties(&self) -> &TableProperties;
9495

95-
/// Get all currently active files in the table.
96+
fn logical_file_schema(&self) -> &'static Schema {
97+
scan_row_schema()
98+
}
99+
100+
/// Get all logical files present in the current snapshot.
96101
///
97102
/// # Parameters
98103
/// - `predicate`: An optional predicate to filter the files based on file statistics.
99104
///
100105
/// # Returns
101-
/// An iterator of [`RecordBatch`]es, where each batch contains add action data.
102-
fn files(
106+
/// An iterator of [`RecordBatch`]es, where each batch contains logical file data.
107+
fn logical_files(
103108
&self,
104109
predicate: Option<ExpressionRef>,
105110
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<RecordBatch>>>>;
106111

107-
fn logical_files(
112+
/// Get all currently active files in the table.
113+
///
114+
/// # Parameters
115+
/// - `predicate`: An optional predicate to filter the files based on file statistics.
116+
///
117+
/// # Returns
118+
/// An iterator of [`RecordBatch`]es, where each batch contains add action data.
119+
#[deprecated(
120+
since = "0.25.0",
121+
note = "Use `logical_files` instead, which returns a more focussed dataset and avoids computational overhead."
122+
)]
123+
fn files(
108124
&self,
109125
predicate: Option<ExpressionRef>,
110126
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<RecordBatch>>>>;
@@ -113,6 +129,7 @@ pub trait Snapshot {
113129
&self,
114130
predicate: Option<ExpressionRef>,
115131
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<AddView>>>> {
132+
#[allow(deprecated)]
116133
Ok(Box::new(AddViewIterator::new(self.files(predicate)?)))
117134
}
118135

@@ -216,6 +233,7 @@ impl<T: Snapshot> Snapshot for Box<T> {
216233
&self,
217234
predicate: Option<ExpressionRef>,
218235
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<RecordBatch>>>> {
236+
#[allow(deprecated)]
219237
self.as_ref().files(predicate)
220238
}
221239

@@ -404,6 +422,7 @@ mod tests {
404422
test_files(snapshot.as_ref())?;
405423
test_files_view(snapshot.as_ref())?;
406424
test_commit_infos(snapshot.as_ref())?;
425+
test_logical_files(snapshot.as_ref())?;
407426
}
408427

409428
let mut snapshot = get_snapshot(ctx, TestTables::Checkpoints, Some(0))?.await?;
@@ -414,22 +433,29 @@ mod tests {
414433
test_files(snapshot.as_ref())?;
415434
test_files_view(snapshot.as_ref())?;
416435
test_commit_infos(snapshot.as_ref())?;
436+
test_logical_files(snapshot.as_ref())?;
417437
}
418438

419439
Ok(())
420440
}
421441

422-
fn test_files(snapshot: &dyn Snapshot) -> TestResult<()> {
423-
let batches = snapshot.files(None)?.collect::<Result<Vec<_>, _>>()?;
424-
let num_files = batches.iter().map(|b| b.num_rows() as i64).sum::<i64>();
442+
fn test_logical_files(snapshot: &dyn Snapshot) -> TestResult<()> {
443+
let logical_files = snapshot
444+
.logical_files(None)?
445+
.collect::<Result<Vec<_>, _>>()?;
446+
let num_files = logical_files
447+
.iter()
448+
.map(|b| b.num_rows() as i64)
449+
.sum::<i64>();
425450
assert_eq!((num_files as u64), snapshot.version());
426451
Ok(())
427452
}
428453

429-
fn test_commit_infos(snapshot: &dyn Snapshot) -> TestResult<()> {
430-
let commit_infos = snapshot.commit_infos(None, Some(100))?.collect::<Vec<_>>();
431-
assert_eq!((commit_infos.len() as u64), snapshot.version() + 1);
432-
assert_eq!(commit_infos.first().unwrap().0, snapshot.version());
454+
fn test_files(snapshot: &dyn Snapshot) -> TestResult<()> {
455+
#[allow(deprecated)]
456+
let batches = snapshot.files(None)?.collect::<Result<Vec<_>, _>>()?;
457+
let num_files = batches.iter().map(|b| b.num_rows() as i64).sum::<i64>();
458+
assert_eq!((num_files as u64), snapshot.version());
433459
Ok(())
434460
}
435461

@@ -441,4 +467,11 @@ mod tests {
441467
assert_eq!(num_files_view, snapshot.version());
442468
Ok(())
443469
}
470+
471+
fn test_commit_infos(snapshot: &dyn Snapshot) -> TestResult<()> {
472+
let commit_infos = snapshot.commit_infos(None, Some(100))?.collect::<Vec<_>>();
473+
assert_eq!((commit_infos.len() as u64), snapshot.version() + 1);
474+
assert_eq!(commit_infos.first().unwrap().0, snapshot.version());
475+
Ok(())
476+
}
444477
}

crates/core/src/operations/transaction/mod.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ use bytes::Bytes;
8080
use chrono::Utc;
8181
use conflict_checker::ConflictChecker;
8282
use futures::future::BoxFuture;
83+
use itertools::Itertools;
8384
use object_store::path::Path;
8485
use object_store::Error as ObjectStoreError;
8586
use serde_json::Value;
@@ -268,22 +269,37 @@ pub struct CommitData {
268269
impl CommitData {
269270
/// Create new data to be committed
270271
pub fn new(
271-
mut actions: Vec<Action>,
272+
actions: Vec<Action>,
272273
operation: DeltaOperation,
273274
mut app_metadata: HashMap<String, Value>,
274275
app_transactions: Vec<Transaction>,
275276
) -> Self {
276-
if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) {
277-
let mut commit_info = operation.get_commit_info();
278-
commit_info.timestamp = Some(Utc::now().timestamp_millis());
277+
// When in-commit-timestamps are enabled, we need to ensure that the commit info is the first action
278+
// in the commit log. If it is not present, we need to add it.
279+
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#writer-requirements-for-in-commit-timestamps
280+
let mut commit_info = None::<Action>;
281+
let mut actions = actions
282+
.into_iter()
283+
.inspect(|a| {
284+
if matches!(a, Action::CommitInfo(..)) {
285+
commit_info = Some(a.clone())
286+
}
287+
})
288+
.filter(|a| matches!(a, Action::CommitInfo(..)))
289+
.collect_vec();
290+
if !commit_info.is_some() {
291+
let mut cm = operation.get_commit_info();
292+
cm.timestamp = Some(Utc::now().timestamp_millis());
279293
app_metadata.insert(
280294
"clientVersion".to_string(),
281295
Value::String(format!("delta-rs.{}", crate_version())),
282296
);
283-
app_metadata.extend(commit_info.info);
284-
commit_info.info = app_metadata.clone();
285-
actions.push(Action::CommitInfo(commit_info))
297+
app_metadata.extend(cm.info);
298+
cm.info = app_metadata.clone();
299+
commit_info = Some(Action::CommitInfo(cm));
286300
}
301+
// safety: we assured commit_info is Some just above.
302+
actions.insert(0, commit_info.unwrap());
287303

288304
for txn in &app_transactions {
289305
actions.push(Action::Txn(txn.clone()))

0 commit comments

Comments
 (0)