Skip to content

Commit 0e0638a

Browse files
committed
Merge pull request JanKaul#239 from gruuya/eq-del-projection-conflict
fix: conflict when projecting a field not present in equality deletes
2 parents 7800cf4 + 4baa1b9 commit 0e0638a

File tree

2 files changed

+78
-58
lines changed

2 files changed

+78
-58
lines changed

datafusion_iceberg/src/table.rs

Lines changed: 48 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -413,30 +413,22 @@ async fn table_scan(
413413

414414
let file_schema: SchemaRef = Arc::new((schema.fields()).try_into().unwrap());
415415

416-
let projection = projection.cloned().or_else(|| {
417-
Some(
418-
arrow_schema
419-
.fields()
420-
.iter()
421-
.enumerate()
422-
.map(|(i, _)| i)
423-
.collect(),
424-
)
425-
});
416+
// If no projection was specified default to projecting all the fields
417+
let projection = projection
418+
.cloned()
419+
.unwrap_or((0..arrow_schema.fields().len()).collect_vec());
426420

427-
let projection_expr: Option<Vec<_>> = projection.as_ref().map(|projection| {
428-
projection
429-
.iter()
430-
.enumerate()
431-
.map(|(i, id)| {
432-
let name = arrow_schema.fields[*id].name();
433-
(
434-
Arc::new(Column::new(name, i)) as Arc<dyn PhysicalExpr>,
435-
name.to_owned(),
436-
)
437-
})
438-
.collect()
439-
});
421+
let projection_expr: Vec<_> = projection
422+
.iter()
423+
.enumerate()
424+
.map(|(i, id)| {
425+
let name = arrow_schema.fields[*id].name();
426+
(
427+
Arc::new(Column::new(name, i)) as Arc<dyn PhysicalExpr>,
428+
name.to_owned(),
429+
)
430+
})
431+
.collect();
440432

441433
if enable_data_file_path_column {
442434
table_partition_cols.push(Field::new(DATA_FILE_PATH_COLUMN, DataType::Utf8, false));
@@ -621,6 +613,31 @@ async fn table_scan(
621613

622614
let mut data_file_iter = data_files.into_iter().peekable();
623615

616+
// Gather the complete equality projection up-front, since in general the requested
617+
// projection may differ from the equality delete columns. Moreover, in principle
618+
// each equality delete file may have different deletion columns.
619+
// And since we need to reconcile them all with data files using joins and unions,
620+
// we need to make sure their schemas are fully compatible in all intermediate nodes.
621+
let mut equality_projection = projection.clone();
622+
delete_files
623+
.iter()
624+
.flat_map(|delete_manifest| delete_manifest.1.data_file().equality_ids())
625+
.flatten()
626+
.unique()
627+
.for_each(|eq_id| {
628+
// Look up the zero-based index of the column based on its equality id
629+
if let Some((id, _)) = schema
630+
.fields()
631+
.iter()
632+
.enumerate()
633+
.find(|(_, f)| f.id == *eq_id)
634+
{
635+
if !equality_projection.contains(&id) {
636+
equality_projection.push(id);
637+
}
638+
}
639+
});
640+
624641
let mut plan = stream::iter(delete_files.iter())
625642
.map(Ok::<_, DataFusionError>)
626643
.try_fold(None, |acc, delete_manifest| {
@@ -632,6 +649,8 @@ async fn table_scan(
632649
let file_schema: Arc<ArrowSchema> = file_schema.clone();
633650
let file_source = file_source.clone();
634651
let mut data_files = Vec::new();
652+
let equality_projection = equality_projection.clone();
653+
635654
while let Some(data_manifest) = data_file_iter.next_if(|x| {
636655
x.1.sequence_number().unwrap()
637656
< delete_manifest.1.sequence_number().unwrap()
@@ -663,26 +682,6 @@ async fn table_scan(
663682
);
664683
let delete_file_schema: SchemaRef =
665684
Arc::new((delete_schema.fields()).try_into().unwrap());
666-
let equality_projection: Option<Vec<usize>> =
667-
match (&projection, delete_manifest.1.data_file().equality_ids()) {
668-
(Some(projection), Some(equality_ids)) => {
669-
let collect: Vec<usize> = schema
670-
.iter()
671-
.enumerate()
672-
.filter_map(|(id, x)| {
673-
if equality_ids.contains(&x.id)
674-
&& !projection.contains(&id)
675-
{
676-
Some(id)
677-
} else {
678-
None
679-
}
680-
})
681-
.collect();
682-
Some([projection.as_slice(), &collect].concat())
683-
}
684-
_ => None,
685-
};
686685

687686
let last_updated_ms = table.metadata().last_updated_ms;
688687
let manifest_path = if enable_manifest_file_path_column {
@@ -730,7 +729,7 @@ async fn table_scan(
730729
)
731730
.with_file_group(FileGroup::new(data_files))
732731
.with_statistics(statistics)
733-
.with_projection(equality_projection)
732+
.with_projection(Some(equality_projection))
734733
.with_limit(limit)
735734
.with_table_partition_cols(table_partition_cols)
736735
.build();
@@ -810,7 +809,7 @@ async fn table_scan(
810809
)
811810
.with_file_group(FileGroup::new(additional_data_files))
812811
.with_statistics(statistics)
813-
.with_projection(projection.as_ref().cloned())
812+
.with_projection(Some(equality_projection))
814813
.with_limit(limit)
815814
.with_table_partition_cols(table_partition_cols)
816815
.build();
@@ -822,14 +821,8 @@ async fn table_scan(
822821
plan = Arc::new(UnionExec::new(vec![plan, data_files_scan]));
823822
}
824823

825-
if let Some(projection_expr) = projection_expr {
826-
Ok::<_, DataFusionError>(Arc::new(ProjectionExec::try_new(
827-
projection_expr,
828-
plan,
829-
)?) as Arc<dyn ExecutionPlan>)
830-
} else {
831-
Ok(plan)
832-
}
824+
Ok::<_, DataFusionError>(Arc::new(ProjectionExec::try_new(projection_expr, plan)?)
825+
as Arc<dyn ExecutionPlan>)
833826
}
834827
})
835828
.try_collect::<Vec<_>>()
@@ -865,7 +858,7 @@ async fn table_scan(
865858
FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
866859
.with_file_groups(file_groups)
867860
.with_statistics(statistics)
868-
.with_projection(projection.clone())
861+
.with_projection(Some(projection.clone()))
869862
.with_limit(limit)
870863
.with_table_partition_cols(table_partition_cols)
871864
.build();
@@ -879,10 +872,7 @@ async fn table_scan(
879872

880873
match plans.len() {
881874
0 => {
882-
let projected_schema = projection
883-
.map(|p| arrow_schema.project(&p))
884-
.transpose()?
885-
.unwrap_or(arrow_schema.as_ref().clone());
875+
let projected_schema = arrow_schema.project(&projection)?;
886876
Ok(Arc::new(EmptyExec::new(Arc::new(projected_schema))))
887877
}
888878
1 => Ok(plans.remove(0)),

datafusion_iceberg/tests/equality_delete.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,4 +215,34 @@ pub async fn test_equality_delete() {
215215
.unwrap()
216216
.collect();
217217
assert_batches_eq!(expected, &duckdb_batches);
218+
219+
// Test that projecting a column that is not included in equality deletes works
220+
ctx.sql(
221+
"INSERT INTO warehouse.test.orders (id, customer_id, product_id, date, amount) VALUES
222+
(7, 3, 2, '2020-01-01', 2),
223+
(8, 2, 1, '2020-02-02', 3),
224+
(9, 1, 3, '2020-01-01', 1);",
225+
)
226+
.await
227+
.expect("Failed to create query plan for insert")
228+
.collect()
229+
.await
230+
.expect("Failed to insert values into table");
231+
232+
let batches = ctx
233+
.sql("select sum(amount) from warehouse.test.orders")
234+
.await
235+
.expect("Failed to create plan for select")
236+
.collect()
237+
.await
238+
.expect("Failed to execute select query");
239+
240+
let expected = [
241+
"+-----------------------------------+",
242+
"| sum(warehouse.test.orders.amount) |",
243+
"+-----------------------------------+",
244+
"| 13 |",
245+
"+-----------------------------------+",
246+
];
247+
assert_batches_eq!(expected, &batches);
218248
}

0 commit comments

Comments
 (0)