Skip to content

Commit 11958f1

Browse files
committed
Merge pull request JanKaul#239 from gruuya/eq-del-projection-conflict
fix: conflict when projecting a field not present in equality deletes
2 parents b5cfe6f + ab32f99 commit 11958f1

File tree

2 files changed

+78
-58
lines changed

2 files changed

+78
-58
lines changed

datafusion_iceberg/src/table.rs

Lines changed: 48 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -421,30 +421,22 @@ async fn table_scan(
421421

422422
let file_schema: SchemaRef = Arc::new((schema.fields()).try_into().unwrap());
423423

424-
let projection = projection.cloned().or_else(|| {
425-
Some(
426-
arrow_schema
427-
.fields()
428-
.iter()
429-
.enumerate()
430-
.map(|(i, _)| i)
431-
.collect(),
432-
)
433-
});
424+
// If no projection was specified default to projecting all the fields
425+
let projection = projection
426+
.cloned()
427+
.unwrap_or((0..arrow_schema.fields().len()).collect_vec());
434428

435-
let projection_expr: Option<Vec<_>> = projection.as_ref().map(|projection| {
436-
projection
437-
.iter()
438-
.enumerate()
439-
.map(|(i, id)| {
440-
let name = arrow_schema.fields[*id].name();
441-
(
442-
Arc::new(Column::new(name, i)) as Arc<dyn PhysicalExpr>,
443-
name.to_owned(),
444-
)
445-
})
446-
.collect()
447-
});
429+
let projection_expr: Vec<_> = projection
430+
.iter()
431+
.enumerate()
432+
.map(|(i, id)| {
433+
let name = arrow_schema.fields[*id].name();
434+
(
435+
Arc::new(Column::new(name, i)) as Arc<dyn PhysicalExpr>,
436+
name.to_owned(),
437+
)
438+
})
439+
.collect();
448440

449441
if enable_data_file_path_column {
450442
table_partition_cols.push(Field::new(DATA_FILE_PATH_COLUMN, DataType::Utf8, false));
@@ -629,6 +621,31 @@ async fn table_scan(
629621

630622
let mut data_file_iter = data_files.into_iter().peekable();
631623

624+
// Gather the complete equality projection up-front, since in general the requested
625+
// projection may differ from the equality delete columns. Moreover, in principle
626+
// each equality delete file may have different deletion columns.
627+
// And since we need to reconcile them all with data files using joins and unions,
628+
// we need to make sure their schemas are fully compatible in all intermediate nodes.
629+
let mut equality_projection = projection.clone();
630+
delete_files
631+
.iter()
632+
.flat_map(|delete_manifest| delete_manifest.1.data_file().equality_ids())
633+
.flatten()
634+
.unique()
635+
.for_each(|eq_id| {
636+
// Look up the zero-based index of the column based on its equality id
637+
if let Some((id, _)) = schema
638+
.fields()
639+
.iter()
640+
.enumerate()
641+
.find(|(_, f)| f.id == *eq_id)
642+
{
643+
if !equality_projection.contains(&id) {
644+
equality_projection.push(id);
645+
}
646+
}
647+
});
648+
632649
let mut plan = stream::iter(delete_files.iter())
633650
.map(Ok::<_, DataFusionError>)
634651
.try_fold(None, |acc, delete_manifest| {
@@ -640,6 +657,8 @@ async fn table_scan(
640657
let file_schema: Arc<ArrowSchema> = file_schema.clone();
641658
let file_source = file_source.clone();
642659
let mut data_files = Vec::new();
660+
let equality_projection = equality_projection.clone();
661+
643662
while let Some(data_manifest) = data_file_iter.next_if(|x| {
644663
x.1.sequence_number().unwrap()
645664
< delete_manifest.1.sequence_number().unwrap()
@@ -671,26 +690,6 @@ async fn table_scan(
671690
);
672691
let delete_file_schema: SchemaRef =
673692
Arc::new((delete_schema.fields()).try_into().unwrap());
674-
let equality_projection: Option<Vec<usize>> =
675-
match (&projection, delete_manifest.1.data_file().equality_ids()) {
676-
(Some(projection), Some(equality_ids)) => {
677-
let collect: Vec<usize> = schema
678-
.iter()
679-
.enumerate()
680-
.filter_map(|(id, x)| {
681-
if equality_ids.contains(&x.id)
682-
&& !projection.contains(&id)
683-
{
684-
Some(id)
685-
} else {
686-
None
687-
}
688-
})
689-
.collect();
690-
Some([projection.as_slice(), &collect].concat())
691-
}
692-
_ => None,
693-
};
694693

695694
let last_updated_ms = table.metadata().last_updated_ms;
696695
let manifest_path = if enable_manifest_file_path_column {
@@ -738,7 +737,7 @@ async fn table_scan(
738737
)
739738
.with_file_group(FileGroup::new(data_files))
740739
.with_statistics(statistics)
741-
.with_projection(equality_projection)
740+
.with_projection(Some(equality_projection))
742741
.with_limit(limit)
743742
.with_table_partition_cols(table_partition_cols)
744743
.build();
@@ -818,7 +817,7 @@ async fn table_scan(
818817
)
819818
.with_file_group(FileGroup::new(additional_data_files))
820819
.with_statistics(statistics)
821-
.with_projection(projection.as_ref().cloned())
820+
.with_projection(Some(equality_projection))
822821
.with_limit(limit)
823822
.with_table_partition_cols(table_partition_cols)
824823
.build();
@@ -830,14 +829,8 @@ async fn table_scan(
830829
plan = Arc::new(UnionExec::new(vec![plan, data_files_scan]));
831830
}
832831

833-
if let Some(projection_expr) = projection_expr {
834-
Ok::<_, DataFusionError>(Arc::new(ProjectionExec::try_new(
835-
projection_expr,
836-
plan,
837-
)?) as Arc<dyn ExecutionPlan>)
838-
} else {
839-
Ok(plan)
840-
}
832+
Ok::<_, DataFusionError>(Arc::new(ProjectionExec::try_new(projection_expr, plan)?)
833+
as Arc<dyn ExecutionPlan>)
841834
}
842835
})
843836
.try_collect::<Vec<_>>()
@@ -873,7 +866,7 @@ async fn table_scan(
873866
FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
874867
.with_file_groups(file_groups)
875868
.with_statistics(statistics)
876-
.with_projection(projection.clone())
869+
.with_projection(Some(projection.clone()))
877870
.with_limit(limit)
878871
.with_table_partition_cols(table_partition_cols)
879872
.build();
@@ -887,10 +880,7 @@ async fn table_scan(
887880

888881
match plans.len() {
889882
0 => {
890-
let projected_schema = projection
891-
.map(|p| arrow_schema.project(&p))
892-
.transpose()?
893-
.unwrap_or(arrow_schema.as_ref().clone());
883+
let projected_schema = arrow_schema.project(&projection)?;
894884
Ok(Arc::new(EmptyExec::new(Arc::new(projected_schema))))
895885
}
896886
1 => Ok(plans.remove(0)),

datafusion_iceberg/tests/equality_delete.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,4 +215,34 @@ pub async fn test_equality_delete() {
215215
.unwrap()
216216
.collect();
217217
assert_batches_eq!(expected, &duckdb_batches);
218+
219+
// Test that projecting a column that is not included in equality deletes works
220+
ctx.sql(
221+
"INSERT INTO warehouse.test.orders (id, customer_id, product_id, date, amount) VALUES
222+
(7, 3, 2, '2020-01-01', 2),
223+
(8, 2, 1, '2020-02-02', 3),
224+
(9, 1, 3, '2020-01-01', 1);",
225+
)
226+
.await
227+
.expect("Failed to create query plan for insert")
228+
.collect()
229+
.await
230+
.expect("Failed to insert values into table");
231+
232+
let batches = ctx
233+
.sql("select sum(amount) from warehouse.test.orders")
234+
.await
235+
.expect("Failed to create plan for select")
236+
.collect()
237+
.await
238+
.expect("Failed to execute select query");
239+
240+
let expected = [
241+
"+-----------------------------------+",
242+
"| sum(warehouse.test.orders.amount) |",
243+
"+-----------------------------------+",
244+
"| 13 |",
245+
"+-----------------------------------+",
246+
];
247+
assert_batches_eq!(expected, &batches);
218248
}

0 commit comments

Comments
 (0)