Skip to content

Commit add7d17

Browse files
committed
Merge pull request JanKaul#239 from gruuya/eq-del-projection-conflict
fix: conflict when projecting a field not present in equality deletes
2 parents bf33d91 + a3ebe16 commit add7d17

File tree

2 files changed

+78
-58
lines changed

2 files changed

+78
-58
lines changed

datafusion_iceberg/src/table.rs

Lines changed: 48 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -414,30 +414,22 @@ async fn table_scan(
414414

415415
let file_schema: SchemaRef = Arc::new((schema.fields()).try_into().unwrap());
416416

417-
let projection = projection.cloned().or_else(|| {
418-
Some(
419-
arrow_schema
420-
.fields()
421-
.iter()
422-
.enumerate()
423-
.map(|(i, _)| i)
424-
.collect(),
425-
)
426-
});
417+
// If no projection was specified default to projecting all the fields
418+
let projection = projection
419+
.cloned()
420+
.unwrap_or((0..arrow_schema.fields().len()).collect_vec());
427421

428-
let projection_expr: Option<Vec<_>> = projection.as_ref().map(|projection| {
429-
projection
430-
.iter()
431-
.enumerate()
432-
.map(|(i, id)| {
433-
let name = arrow_schema.fields[*id].name();
434-
(
435-
Arc::new(Column::new(name, i)) as Arc<dyn PhysicalExpr>,
436-
name.to_owned(),
437-
)
438-
})
439-
.collect()
440-
});
422+
let projection_expr: Vec<_> = projection
423+
.iter()
424+
.enumerate()
425+
.map(|(i, id)| {
426+
let name = arrow_schema.fields[*id].name();
427+
(
428+
Arc::new(Column::new(name, i)) as Arc<dyn PhysicalExpr>,
429+
name.to_owned(),
430+
)
431+
})
432+
.collect();
441433

442434
if enable_data_file_path_column {
443435
table_partition_cols.push(Field::new(DATA_FILE_PATH_COLUMN, DataType::Utf8, false));
@@ -622,6 +614,31 @@ async fn table_scan(
622614

623615
let mut data_file_iter = data_files.into_iter().peekable();
624616

617+
// Gather the complete equality projection up-front, since in general the requested
618+
// projection may differ from the equality delete columns. Moreover, in principle
619+
// each equality delete file may have different deletion columns.
620+
// And since we need to reconcile them all with data files using joins and unions,
621+
// we need to make sure their schemas are fully compatible in all intermediate nodes.
622+
let mut equality_projection = projection.clone();
623+
delete_files
624+
.iter()
625+
.flat_map(|delete_manifest| delete_manifest.1.data_file().equality_ids())
626+
.flatten()
627+
.unique()
628+
.for_each(|eq_id| {
629+
// Look up the zero-based index of the column based on its equality id
630+
if let Some((id, _)) = schema
631+
.fields()
632+
.iter()
633+
.enumerate()
634+
.find(|(_, f)| f.id == *eq_id)
635+
{
636+
if !equality_projection.contains(&id) {
637+
equality_projection.push(id);
638+
}
639+
}
640+
});
641+
625642
let mut plan = stream::iter(delete_files.iter())
626643
.map(Ok::<_, DataFusionError>)
627644
.try_fold(None, |acc, delete_manifest| {
@@ -633,6 +650,8 @@ async fn table_scan(
633650
let file_schema: Arc<ArrowSchema> = file_schema.clone();
634651
let file_source = file_source.clone();
635652
let mut data_files = Vec::new();
653+
let equality_projection = equality_projection.clone();
654+
636655
while let Some(data_manifest) = data_file_iter.next_if(|x| {
637656
x.1.sequence_number().unwrap()
638657
< delete_manifest.1.sequence_number().unwrap()
@@ -664,26 +683,6 @@ async fn table_scan(
664683
);
665684
let delete_file_schema: SchemaRef =
666685
Arc::new((delete_schema.fields()).try_into().unwrap());
667-
let equality_projection: Option<Vec<usize>> =
668-
match (&projection, delete_manifest.1.data_file().equality_ids()) {
669-
(Some(projection), Some(equality_ids)) => {
670-
let collect: Vec<usize> = schema
671-
.iter()
672-
.enumerate()
673-
.filter_map(|(id, x)| {
674-
if equality_ids.contains(&x.id)
675-
&& !projection.contains(&id)
676-
{
677-
Some(id)
678-
} else {
679-
None
680-
}
681-
})
682-
.collect();
683-
Some([projection.as_slice(), &collect].concat())
684-
}
685-
_ => None,
686-
};
687686

688687
let last_updated_ms = table.metadata().last_updated_ms;
689688
let manifest_path = if enable_manifest_file_path_column {
@@ -731,7 +730,7 @@ async fn table_scan(
731730
)
732731
.with_file_group(FileGroup::new(data_files))
733732
.with_statistics(statistics)
734-
.with_projection(equality_projection)
733+
.with_projection(Some(equality_projection))
735734
.with_limit(limit)
736735
.with_table_partition_cols(table_partition_cols)
737736
.build();
@@ -811,7 +810,7 @@ async fn table_scan(
811810
)
812811
.with_file_group(FileGroup::new(additional_data_files))
813812
.with_statistics(statistics)
814-
.with_projection(projection.as_ref().cloned())
813+
.with_projection(Some(equality_projection))
815814
.with_limit(limit)
816815
.with_table_partition_cols(table_partition_cols)
817816
.build();
@@ -823,14 +822,8 @@ async fn table_scan(
823822
plan = Arc::new(UnionExec::new(vec![plan, data_files_scan]));
824823
}
825824

826-
if let Some(projection_expr) = projection_expr {
827-
Ok::<_, DataFusionError>(Arc::new(ProjectionExec::try_new(
828-
projection_expr,
829-
plan,
830-
)?) as Arc<dyn ExecutionPlan>)
831-
} else {
832-
Ok(plan)
833-
}
825+
Ok::<_, DataFusionError>(Arc::new(ProjectionExec::try_new(projection_expr, plan)?)
826+
as Arc<dyn ExecutionPlan>)
834827
}
835828
})
836829
.try_collect::<Vec<_>>()
@@ -866,7 +859,7 @@ async fn table_scan(
866859
FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
867860
.with_file_groups(file_groups)
868861
.with_statistics(statistics)
869-
.with_projection(projection.clone())
862+
.with_projection(Some(projection.clone()))
870863
.with_limit(limit)
871864
.with_table_partition_cols(table_partition_cols)
872865
.build();
@@ -880,10 +873,7 @@ async fn table_scan(
880873

881874
match plans.len() {
882875
0 => {
883-
let projected_schema = projection
884-
.map(|p| arrow_schema.project(&p))
885-
.transpose()?
886-
.unwrap_or(arrow_schema.as_ref().clone());
876+
let projected_schema = arrow_schema.project(&projection)?;
887877
Ok(Arc::new(EmptyExec::new(Arc::new(projected_schema))))
888878
}
889879
1 => Ok(plans.remove(0)),

datafusion_iceberg/tests/equality_delete.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,4 +215,34 @@ pub async fn test_equality_delete() {
215215
.unwrap()
216216
.collect();
217217
assert_batches_eq!(expected, &duckdb_batches);
218+
219+
// Test that projecting a column that is not included in equality deletes works
220+
ctx.sql(
221+
"INSERT INTO warehouse.test.orders (id, customer_id, product_id, date, amount) VALUES
222+
(7, 3, 2, '2020-01-01', 2),
223+
(8, 2, 1, '2020-02-02', 3),
224+
(9, 1, 3, '2020-01-01', 1);",
225+
)
226+
.await
227+
.expect("Failed to create query plan for insert")
228+
.collect()
229+
.await
230+
.expect("Failed to insert values into table");
231+
232+
let batches = ctx
233+
.sql("select sum(amount) from warehouse.test.orders")
234+
.await
235+
.expect("Failed to create plan for select")
236+
.collect()
237+
.await
238+
.expect("Failed to execute select query");
239+
240+
let expected = [
241+
"+-----------------------------------+",
242+
"| sum(warehouse.test.orders.amount) |",
243+
"+-----------------------------------+",
244+
"| 13 |",
245+
"+-----------------------------------+",
246+
];
247+
assert_batches_eq!(expected, &batches);
218248
}

0 commit comments

Comments
 (0)