Skip to content

Commit f2108e9

Browse files
authored
Merge pull request JanKaul#239 from gruuya/eq-del-projection-conflict
fix: conflict when projecting a field not present in equality deletes
2 parents ba78a1f + a479432 commit f2108e9

File tree

2 files changed

+78
-58
lines changed

2 files changed

+78
-58
lines changed

datafusion_iceberg/src/table.rs

Lines changed: 48 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -408,30 +408,22 @@ async fn table_scan(
408408

409409
let file_schema: SchemaRef = Arc::new((schema.fields()).try_into().unwrap());
410410

411-
let projection = projection.cloned().or_else(|| {
412-
Some(
413-
arrow_schema
414-
.fields()
415-
.iter()
416-
.enumerate()
417-
.map(|(i, _)| i)
418-
.collect(),
419-
)
420-
});
411+
// If no projection was specified default to projecting all the fields
412+
let projection = projection
413+
.cloned()
414+
.unwrap_or((0..arrow_schema.fields().len()).collect_vec());
421415

422-
let projection_expr: Option<Vec<_>> = projection.as_ref().map(|projection| {
423-
projection
424-
.iter()
425-
.enumerate()
426-
.map(|(i, id)| {
427-
let name = arrow_schema.fields[*id].name();
428-
(
429-
Arc::new(Column::new(name, i)) as Arc<dyn PhysicalExpr>,
430-
name.to_owned(),
431-
)
432-
})
433-
.collect()
434-
});
416+
let projection_expr: Vec<_> = projection
417+
.iter()
418+
.enumerate()
419+
.map(|(i, id)| {
420+
let name = arrow_schema.fields[*id].name();
421+
(
422+
Arc::new(Column::new(name, i)) as Arc<dyn PhysicalExpr>,
423+
name.to_owned(),
424+
)
425+
})
426+
.collect();
435427

436428
if enable_data_file_path_column {
437429
table_partition_cols.push(Field::new(DATA_FILE_PATH_COLUMN, DataType::Utf8, false));
@@ -616,6 +608,31 @@ async fn table_scan(
616608

617609
let mut data_file_iter = data_files.into_iter().peekable();
618610

611+
// Gather the complete equality projection up-front, since in general the requested
612+
// projection may differ from the equality delete columns. Moreover, in principle
613+
// each equality delete file may have different deletion columns.
614+
// And since we need to reconcile them all with data files using joins and unions,
615+
// we need to make sure their schemas are fully compatible in all intermediate nodes.
616+
let mut equality_projection = projection.clone();
617+
delete_files
618+
.iter()
619+
.flat_map(|delete_manifest| delete_manifest.1.data_file().equality_ids())
620+
.flatten()
621+
.unique()
622+
.for_each(|eq_id| {
623+
// Look up the zero-based index of the column based on its equality id
624+
if let Some((id, _)) = schema
625+
.fields()
626+
.iter()
627+
.enumerate()
628+
.find(|(_, f)| f.id == *eq_id)
629+
{
630+
if !equality_projection.contains(&id) {
631+
equality_projection.push(id);
632+
}
633+
}
634+
});
635+
619636
let mut plan = stream::iter(delete_files.iter())
620637
.map(Ok::<_, DataFusionError>)
621638
.try_fold(None, |acc, delete_manifest| {
@@ -627,6 +644,8 @@ async fn table_scan(
627644
let file_schema: Arc<ArrowSchema> = file_schema.clone();
628645
let file_source = file_source.clone();
629646
let mut data_files = Vec::new();
647+
let equality_projection = equality_projection.clone();
648+
630649
while let Some(data_manifest) = data_file_iter.next_if(|x| {
631650
x.1.sequence_number().unwrap()
632651
< delete_manifest.1.sequence_number().unwrap()
@@ -658,26 +677,6 @@ async fn table_scan(
658677
);
659678
let delete_file_schema: SchemaRef =
660679
Arc::new((delete_schema.fields()).try_into().unwrap());
661-
let equality_projection: Option<Vec<usize>> =
662-
match (&projection, delete_manifest.1.data_file().equality_ids()) {
663-
(Some(projection), Some(equality_ids)) => {
664-
let collect: Vec<usize> = schema
665-
.iter()
666-
.enumerate()
667-
.filter_map(|(id, x)| {
668-
if equality_ids.contains(&x.id)
669-
&& !projection.contains(&id)
670-
{
671-
Some(id)
672-
} else {
673-
None
674-
}
675-
})
676-
.collect();
677-
Some([projection.as_slice(), &collect].concat())
678-
}
679-
_ => None,
680-
};
681680

682681
let last_updated_ms = table.metadata().last_updated_ms;
683682
let manifest_path = if enable_manifest_file_path_column {
@@ -725,7 +724,7 @@ async fn table_scan(
725724
)
726725
.with_file_group(FileGroup::new(data_files))
727726
.with_statistics(statistics)
728-
.with_projection(equality_projection)
727+
.with_projection(Some(equality_projection))
729728
.with_limit(limit)
730729
.with_table_partition_cols(table_partition_cols)
731730
.build();
@@ -805,7 +804,7 @@ async fn table_scan(
805804
)
806805
.with_file_group(FileGroup::new(additional_data_files))
807806
.with_statistics(statistics)
808-
.with_projection(projection.as_ref().cloned())
807+
.with_projection(Some(equality_projection))
809808
.with_limit(limit)
810809
.with_table_partition_cols(table_partition_cols)
811810
.build();
@@ -817,14 +816,8 @@ async fn table_scan(
817816
plan = Arc::new(UnionExec::new(vec![plan, data_files_scan]));
818817
}
819818

820-
if let Some(projection_expr) = projection_expr {
821-
Ok::<_, DataFusionError>(Arc::new(ProjectionExec::try_new(
822-
projection_expr,
823-
plan,
824-
)?) as Arc<dyn ExecutionPlan>)
825-
} else {
826-
Ok(plan)
827-
}
819+
Ok::<_, DataFusionError>(Arc::new(ProjectionExec::try_new(projection_expr, plan)?)
820+
as Arc<dyn ExecutionPlan>)
828821
}
829822
})
830823
.try_collect::<Vec<_>>()
@@ -860,7 +853,7 @@ async fn table_scan(
860853
FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
861854
.with_file_groups(file_groups)
862855
.with_statistics(statistics)
863-
.with_projection(projection.clone())
856+
.with_projection(Some(projection.clone()))
864857
.with_limit(limit)
865858
.with_table_partition_cols(table_partition_cols)
866859
.build();
@@ -874,10 +867,7 @@ async fn table_scan(
874867

875868
match plans.len() {
876869
0 => {
877-
let projected_schema = projection
878-
.map(|p| arrow_schema.project(&p))
879-
.transpose()?
880-
.unwrap_or(arrow_schema.as_ref().clone());
870+
let projected_schema = arrow_schema.project(&projection)?;
881871
Ok(Arc::new(EmptyExec::new(Arc::new(projected_schema))))
882872
}
883873
1 => Ok(plans.remove(0)),

datafusion_iceberg/tests/equality_delete.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,4 +215,34 @@ pub async fn test_equality_delete() {
215215
.unwrap()
216216
.collect();
217217
assert_batches_eq!(expected, &duckdb_batches);
218+
219+
// Test that projecting a column that is not included in equality deletes works
220+
ctx.sql(
221+
"INSERT INTO warehouse.test.orders (id, customer_id, product_id, date, amount) VALUES
222+
(7, 3, 2, '2020-01-01', 2),
223+
(8, 2, 1, '2020-02-02', 3),
224+
(9, 1, 3, '2020-01-01', 1);",
225+
)
226+
.await
227+
.expect("Failed to create query plan for insert")
228+
.collect()
229+
.await
230+
.expect("Failed to insert values into table");
231+
232+
let batches = ctx
233+
.sql("select sum(amount) from warehouse.test.orders")
234+
.await
235+
.expect("Failed to create plan for select")
236+
.collect()
237+
.await
238+
.expect("Failed to execute select query");
239+
240+
let expected = [
241+
"+-----------------------------------+",
242+
"| sum(warehouse.test.orders.amount) |",
243+
"+-----------------------------------+",
244+
"| 13 |",
245+
"+-----------------------------------+",
246+
];
247+
assert_batches_eq!(expected, &batches);
218248
}

0 commit comments

Comments
 (0)