Skip to content

Commit e193f0a

Browse files
authored
Merge pull request JanKaul#261 from JanKaul/improve-unpartitioned-table-scan
Improve unpartitioned table scan
2 parents a42207c + 73ce5b5 commit e193f0a

File tree

3 files changed

+60
-36
lines changed

3 files changed

+60
-36
lines changed

datafusion_iceberg/src/table.rs

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use itertools::Itertools;
2121
use lru::LruCache;
2222
use object_store::path::Path;
2323
use object_store::ObjectMeta;
24+
use std::collections::BTreeMap;
2425
use std::thread::available_parallelism;
2526
use std::{
2627
any::Any,
@@ -434,7 +435,9 @@ async fn table_scan(
434435
HashMap::new();
435436

436437
// Prune data & delete file and insert them into the according map
437-
let statistics = if let Some(physical_predicate) = physical_predicate.clone() {
438+
let (content_file_iter, statistics) = if let Some(physical_predicate) =
439+
physical_predicate.clone()
440+
{
438441
let partition_schema = Arc::new(ArrowSchema::new(table_partition_cols.clone()));
439442
let partition_column_names = partition_fields
440443
.iter()
@@ -502,31 +505,11 @@ async fn table_scan(
502505

503506
let statistics = statistics_from_datafiles(&schema, &data_files);
504507

505-
data_files
508+
let iter = data_files
506509
.into_iter()
507510
.zip(files_to_prune.into_iter())
508-
.for_each(|(manifest, prune_file)| {
509-
if prune_file && *manifest.1.status() != Status::Deleted {
510-
match manifest.1.data_file().content() {
511-
Content::Data => {
512-
data_file_groups
513-
.entry(manifest.1.data_file().partition().clone())
514-
.or_default()
515-
.push(manifest);
516-
}
517-
Content::EqualityDeletes => {
518-
equality_delete_file_groups
519-
.entry(manifest.1.data_file().partition().clone())
520-
.or_default()
521-
.push(manifest);
522-
}
523-
Content::PositionDeletes => {
524-
panic!("Position deletes not supported.")
525-
}
526-
}
527-
};
528-
});
529-
statistics
511+
.filter_map(|(manifest, prune_file)| if prune_file { Some(manifest) } else { None });
512+
(itertools::Either::Left(iter), statistics)
530513
} else {
531514
let manifests = table
532515
.manifests(snapshot_range.0, snapshot_range.1)
@@ -541,7 +524,38 @@ async fn table_scan(
541524

542525
let statistics = statistics_from_datafiles(&schema, &data_files);
543526

544-
data_files.into_iter().for_each(|manifest| {
527+
let iter = data_files.into_iter();
528+
(itertools::Either::Right(iter), statistics)
529+
};
530+
531+
if partition_fields.is_empty() {
532+
let (data_files, equality_delete_files): (Vec<_>, Vec<_>) = content_file_iter
533+
.filter(|manifest| *manifest.1.status() != Status::Deleted)
534+
.partition(|manifest| match manifest.1.data_file().content() {
535+
Content::Data => true,
536+
Content::EqualityDeletes => false,
537+
Content::PositionDeletes => panic!("Position deletes not supported."),
538+
});
539+
if !data_files.is_empty() {
540+
data_file_groups.insert(
541+
Struct {
542+
fields: Vec::new(),
543+
lookup: BTreeMap::new(),
544+
},
545+
data_files,
546+
);
547+
}
548+
if !equality_delete_files.is_empty() {
549+
equality_delete_file_groups.insert(
550+
Struct {
551+
fields: Vec::new(),
552+
lookup: BTreeMap::new(),
553+
},
554+
equality_delete_files,
555+
);
556+
}
557+
} else {
558+
content_file_iter.for_each(|manifest| {
545559
if *manifest.1.status() != Status::Deleted {
546560
match manifest.1.data_file().content() {
547561
Content::Data => {
@@ -562,18 +576,24 @@ async fn table_scan(
562576
}
563577
}
564578
});
565-
statistics
566-
};
579+
}
567580

568-
let file_source = Arc::new(
569-
if let Some(physical_predicate) = physical_predicate.clone() {
570-
ParquetSource::default()
571-
.with_predicate(physical_predicate)
572-
.with_pushdown_filters(true)
573-
} else {
574-
ParquetSource::default()
575-
},
576-
);
581+
let file_source = {
582+
let physical_predicate = physical_predicate.clone();
583+
async move {
584+
Arc::new(
585+
if let Some(physical_predicate) = physical_predicate.clone() {
586+
ParquetSource::default()
587+
.with_predicate(physical_predicate)
588+
.with_pushdown_filters(true)
589+
} else {
590+
ParquetSource::default()
591+
},
592+
)
593+
}
594+
.instrument(tracing::debug_span!("datafusion_iceberg::file_source"))
595+
.await
596+
};
577597

578598
// Create plan for every partition with delete files
579599
let mut plans = stream::iter(equality_delete_file_groups.into_iter())

iceberg-rust-spec/src/spec/types.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ impl StructType {
292292
/// # Returns
293293
/// * `Some(&StructField)` if a field exists at that index
294294
/// * `None` if no field exists at that index
295+
#[inline]
295296
pub fn get(&self, index: usize) -> Option<&StructField> {
296297
self.lookup
297298
.get(&(index as i32))

iceberg-rust-spec/src/spec/values.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,9 @@ impl Struct {
222222
schema: &StructType,
223223
partition_spec: &[PartitionField],
224224
) -> Result<Self, Error> {
225+
if self.fields.is_empty() {
226+
return Ok(self);
227+
}
225228
// Returns a HashMap mapping partition field names to transformed types.
226229
let map = partition_spec
227230
.iter()

0 commit comments

Comments
 (0)