@@ -414,30 +414,22 @@ async fn table_scan(
414414
415415 let file_schema: SchemaRef = Arc :: new ( ( schema. fields ( ) ) . try_into ( ) . unwrap ( ) ) ;
416416
417- let projection = projection. cloned ( ) . or_else ( || {
418- Some (
419- arrow_schema
420- . fields ( )
421- . iter ( )
422- . enumerate ( )
423- . map ( |( i, _) | i)
424- . collect ( ) ,
425- )
426- } ) ;
417+ // If no projection was specified default to projecting all the fields
418+ let projection = projection
419+ . cloned ( )
420+ . unwrap_or ( ( 0 ..arrow_schema. fields ( ) . len ( ) ) . collect_vec ( ) ) ;
427421
428- let projection_expr: Option < Vec < _ > > = projection. as_ref ( ) . map ( |projection| {
429- projection
430- . iter ( )
431- . enumerate ( )
432- . map ( |( i, id) | {
433- let name = arrow_schema. fields [ * id] . name ( ) ;
434- (
435- Arc :: new ( Column :: new ( name, i) ) as Arc < dyn PhysicalExpr > ,
436- name. to_owned ( ) ,
437- )
438- } )
439- . collect ( )
440- } ) ;
422+ let projection_expr: Vec < _ > = projection
423+ . iter ( )
424+ . enumerate ( )
425+ . map ( |( i, id) | {
426+ let name = arrow_schema. fields [ * id] . name ( ) ;
427+ (
428+ Arc :: new ( Column :: new ( name, i) ) as Arc < dyn PhysicalExpr > ,
429+ name. to_owned ( ) ,
430+ )
431+ } )
432+ . collect ( ) ;
441433
442434 if enable_data_file_path_column {
443435 table_partition_cols. push ( Field :: new ( DATA_FILE_PATH_COLUMN , DataType :: Utf8 , false ) ) ;
@@ -622,6 +614,31 @@ async fn table_scan(
622614
623615 let mut data_file_iter = data_files. into_iter ( ) . peekable ( ) ;
624616
617+ // Gather the complete equality projection up-front, since in general the requested
618+ // projection may differ from the equality delete columns. Moreover, in principle
619+ // each equality delete file may have different deletion columns.
620+ // And since we need to reconcile them all with data files using joins and unions,
621+ // we need to make sure their schemas are fully compatible in all intermediate nodes.
622+ let mut equality_projection = projection. clone ( ) ;
623+ delete_files
624+ . iter ( )
625+ . flat_map ( |delete_manifest| delete_manifest. 1 . data_file ( ) . equality_ids ( ) )
626+ . flatten ( )
627+ . unique ( )
628+ . for_each ( |eq_id| {
629+ // Look up the zero-based index of the column based on its equality id
630+ if let Some ( ( id, _) ) = schema
631+ . fields ( )
632+ . iter ( )
633+ . enumerate ( )
634+ . find ( |( _, f) | f. id == * eq_id)
635+ {
636+ if !equality_projection. contains ( & id) {
637+ equality_projection. push ( id) ;
638+ }
639+ }
640+ } ) ;
641+
625642 let mut plan = stream:: iter ( delete_files. iter ( ) )
626643 . map ( Ok :: < _ , DataFusionError > )
627644 . try_fold ( None , |acc, delete_manifest| {
@@ -633,6 +650,8 @@ async fn table_scan(
633650 let file_schema: Arc < ArrowSchema > = file_schema. clone ( ) ;
634651 let file_source = file_source. clone ( ) ;
635652 let mut data_files = Vec :: new ( ) ;
653+ let equality_projection = equality_projection. clone ( ) ;
654+
636655 while let Some ( data_manifest) = data_file_iter. next_if ( |x| {
637656 x. 1 . sequence_number ( ) . unwrap ( )
638657 < delete_manifest. 1 . sequence_number ( ) . unwrap ( )
@@ -664,26 +683,6 @@ async fn table_scan(
664683 ) ;
665684 let delete_file_schema: SchemaRef =
666685 Arc :: new ( ( delete_schema. fields ( ) ) . try_into ( ) . unwrap ( ) ) ;
667- let equality_projection: Option < Vec < usize > > =
668- match ( & projection, delete_manifest. 1 . data_file ( ) . equality_ids ( ) ) {
669- ( Some ( projection) , Some ( equality_ids) ) => {
670- let collect: Vec < usize > = schema
671- . iter ( )
672- . enumerate ( )
673- . filter_map ( |( id, x) | {
674- if equality_ids. contains ( & x. id )
675- && !projection. contains ( & id)
676- {
677- Some ( id)
678- } else {
679- None
680- }
681- } )
682- . collect ( ) ;
683- Some ( [ projection. as_slice ( ) , & collect] . concat ( ) )
684- }
685- _ => None ,
686- } ;
687686
688687 let last_updated_ms = table. metadata ( ) . last_updated_ms ;
689688 let manifest_path = if enable_manifest_file_path_column {
@@ -731,7 +730,7 @@ async fn table_scan(
731730 )
732731 . with_file_group ( FileGroup :: new ( data_files) )
733732 . with_statistics ( statistics)
734- . with_projection ( equality_projection)
733+ . with_projection ( Some ( equality_projection) )
735734 . with_limit ( limit)
736735 . with_table_partition_cols ( table_partition_cols)
737736 . build ( ) ;
@@ -811,7 +810,7 @@ async fn table_scan(
811810 )
812811 . with_file_group ( FileGroup :: new ( additional_data_files) )
813812 . with_statistics ( statistics)
814- . with_projection ( projection . as_ref ( ) . cloned ( ) )
813+ . with_projection ( Some ( equality_projection ) )
815814 . with_limit ( limit)
816815 . with_table_partition_cols ( table_partition_cols)
817816 . build ( ) ;
@@ -823,14 +822,8 @@ async fn table_scan(
823822 plan = Arc :: new ( UnionExec :: new ( vec ! [ plan, data_files_scan] ) ) ;
824823 }
825824
826- if let Some ( projection_expr) = projection_expr {
827- Ok :: < _ , DataFusionError > ( Arc :: new ( ProjectionExec :: try_new (
828- projection_expr,
829- plan,
830- ) ?) as Arc < dyn ExecutionPlan > )
831- } else {
832- Ok ( plan)
833- }
825+ Ok :: < _ , DataFusionError > ( Arc :: new ( ProjectionExec :: try_new ( projection_expr, plan) ?)
826+ as Arc < dyn ExecutionPlan > )
834827 }
835828 } )
836829 . try_collect :: < Vec < _ > > ( )
@@ -866,7 +859,7 @@ async fn table_scan(
866859 FileScanConfigBuilder :: new ( object_store_url, file_schema, file_source)
867860 . with_file_groups ( file_groups)
868861 . with_statistics ( statistics)
869- . with_projection ( projection. clone ( ) )
862+ . with_projection ( Some ( projection. clone ( ) ) )
870863 . with_limit ( limit)
871864 . with_table_partition_cols ( table_partition_cols)
872865 . build ( ) ;
@@ -880,10 +873,7 @@ async fn table_scan(
880873
881874 match plans. len ( ) {
882875 0 => {
883- let projected_schema = projection
884- . map ( |p| arrow_schema. project ( & p) )
885- . transpose ( ) ?
886- . unwrap_or ( arrow_schema. as_ref ( ) . clone ( ) ) ;
876+ let projected_schema = arrow_schema. project ( & projection) ?;
887877 Ok ( Arc :: new ( EmptyExec :: new ( Arc :: new ( projected_schema) ) ) )
888878 }
889879 1 => Ok ( plans. remove ( 0 ) ) ,
0 commit comments