Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions datafusion/datasource/src/file_scan_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ pub struct FileScanConfig {
/// If the number of file partitions > target_partitions, the file partitions will be grouped
/// in a round-robin fashion such that number of file partitions = target_partitions.
pub partitioned_by_file_group: bool,
/// Desired-but-not-required ordering for the shared work queue that
/// feeds [`SharedWorkSource`].
///
/// Populated by Inexact sort pushdown so that workers pull files in
/// globally best-first order (e.g., highest-max for DESC TopK), letting
/// dynamic filters tighten fastest across the whole scan.
///
/// Distinct from [`Self::output_ordering`]: `output_ordering` is a
/// contract the scan guarantees, while this is only a scheduling hint.
/// Has no effect when `SharedWorkSource` is not used (i.e. when
/// `preserve_order` or `partitioned_by_file_group` is true).
pub(crate) work_order_hint: Option<LexOrdering>,
}

/// A builder for [`FileScanConfig`]'s.
Expand Down Expand Up @@ -280,6 +292,7 @@ pub struct FileScanConfigBuilder {
batch_size: Option<usize>,
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
partitioned_by_file_group: bool,
work_order_hint: Option<LexOrdering>,
}

impl FileScanConfigBuilder {
Expand All @@ -306,6 +319,7 @@ impl FileScanConfigBuilder {
batch_size: None,
expr_adapter_factory: None,
partitioned_by_file_group: false,
work_order_hint: None,
}
}

Expand Down Expand Up @@ -527,6 +541,7 @@ impl FileScanConfigBuilder {
batch_size,
expr_adapter_factory: expr_adapter,
partitioned_by_file_group,
work_order_hint,
} = self;

let constraints = constraints.unwrap_or_default();
Expand All @@ -552,6 +567,7 @@ impl FileScanConfigBuilder {
expr_adapter_factory: expr_adapter,
statistics,
partitioned_by_file_group,
work_order_hint,
}
}
}
Expand All @@ -571,6 +587,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
batch_size: config.batch_size,
expr_adapter_factory: config.expr_adapter_factory,
partitioned_by_file_group: config.partitioned_by_file_group,
work_order_hint: config.work_order_hint,
}
}
}
Expand Down Expand Up @@ -2667,6 +2684,97 @@ mod tests {
Ok(())
}

#[test]
fn sort_pushdown_inexact_populates_work_order_hint() -> Result<()> {
// Inexact pushdown (via Unsupported → try_sort_file_groups_by_statistics)
// must leave a sort hint on the config so SharedWorkSource can
// seed its queue in globally best-first order.
let file_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
let file_source = Arc::new(MockSource::new(table_schema));

// Two groups whose min/max values interleave — per-group sort from
// #21182 is not sufficient to give workers globally best-first files.
// Intra-group ordering must be non-natural so that
// `try_sort_file_groups_by_statistics` reorders → Inexact.
let file_groups = vec![
FileGroup::new(vec![
make_file_with_stats("g0_hi", 50.0, 60.0),
make_file_with_stats("g0_mid", 30.0, 40.0),
]),
FileGroup::new(vec![
make_file_with_stats("g1_top", 70.0, 80.0),
make_file_with_stats("g1_lo", 10.0, 20.0),
]),
];

let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
.with_file_groups(file_groups)
.build();

let result = config.try_pushdown_sort(std::slice::from_ref(&sort_expr))?;
let SortOrderPushdownResult::Inexact { inner } = result else {
panic!("Expected Inexact result, got {result:?}");
};
let pushed = inner
.downcast_ref::<FileScanConfig>()
.expect("Expected FileScanConfig");

// The hint must be populated and match the requested ordering.
let hint = pushed
.work_order_hint
.as_ref()
.expect("work_order_hint must be set on Inexact");
assert_eq!(hint.len(), 1);
assert_eq!(hint[0], sort_expr);

// Partition count must be unchanged — groups drive output_partitioning.
assert_eq!(pushed.file_groups.len(), 2);
Ok(())
}

#[test]
fn sort_pushdown_rebuild_reverse_leaves_hint_none() -> Result<()> {
// When the requested order is the reverse of the scan's declared
// output_ordering, rebuild_with_source reverses files per group
// instead of stats-sorting. A globally-ascending hint would defeat
// that reversal, so the hint must stay None on this path.
let file_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
let file_source: Arc<dyn FileSource> = Arc::new(MockSource::new(table_schema));

let file_groups = vec![FileGroup::new(vec![
make_file_with_stats("f1", 0.0, 9.0),
make_file_with_stats("f2", 10.0, 19.0),
make_file_with_stats("f3", 20.0, 30.0),
])];

let asc = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
// Use `.reverse()` so both `descending` and `nulls_first` flip,
// which is what `LexOrdering::is_reverse` requires.
let desc = asc.reverse();
let declared: LexOrdering = [asc.clone()].into();

let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
.with_file_groups(file_groups)
.with_output_ordering(vec![declared])
.build();

// is_exact=false → Inexact path, the one that could populate the hint.
let rebuilt = config.rebuild_with_source(
Arc::clone(&config.file_source),
false,
&[desc],
)?;
assert!(rebuilt.work_order_hint.is_none());
Ok(())
}

#[test]
fn sort_pushdown_unsupported_source_already_sorted() -> Result<()> {
let file_schema =
Expand Down
79 changes: 77 additions & 2 deletions datafusion/datasource/src/file_scan_config/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
//! core configuration and data-source plumbing.

use super::FileScanConfig;
use crate::PartitionedFile;
use crate::file::FileSource;
use crate::file_groups::FileGroup;
use crate::source::DataSource;
Expand Down Expand Up @@ -110,9 +111,11 @@ impl FileScanConfig {

new_config.file_source = new_file_source;

let sort_order = LexOrdering::new(order.iter().cloned());

// Sort files within groups by statistics when not reversing
let all_non_overlapping = if !reverse_file_groups {
if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) {
if let Some(sort_order) = sort_order.as_ref() {
let projected_schema = new_config.projected_schema()?;
let projection_indices = new_config
.file_source
Expand All @@ -121,7 +124,7 @@ impl FileScanConfig {
.and_then(|p| ordered_column_indices_from_projection(p));
let result = sort_files_within_groups_by_statistics(
&new_config.file_groups,
&sort_order,
sort_order,
&projected_schema,
projection_indices.as_deref(),
);
Expand Down Expand Up @@ -164,6 +167,19 @@ impl FileScanConfig {
// sits idle the entire time, losing the parallelism benefit.
} else {
new_config.output_ordering = vec![];
// Inexact: SortExec remains above and a SharedWorkSource may be
// active for this scan. Stash the requested sort order so the
// shared queue can be seeded in globally best-first order —
// dynamic filters (TopK, etc.) then tighten fastest across the
// whole scan. Skip on the reverse path: the within-group reversal
// must not be overridden by a global ascending sort (a globally
// descending hint is a possible follow-up).
//
// TODO: populate a reversed hint in the `reverse_file_groups`
// case for symmetry (see lines ~99-109 above).
if !reverse_file_groups {
new_config.work_order_hint = sort_order;
}
}

Ok(new_config)
Expand Down Expand Up @@ -249,6 +265,11 @@ impl FileScanConfig {
}

new_config.output_ordering = vec![];
// Inexact: see `rebuild_with_source` for rationale. This branch is
// always reached via the non-reversing path (`try_sort_file_groups_by_statistics`
// only sorts in the natural direction), so no reverse-scan gating
// is needed here.
new_config.work_order_hint = Some(sort_order);
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(new_config),
})
Expand Down Expand Up @@ -343,6 +364,60 @@ pub(crate) fn sort_files_within_groups_by_statistics(
}
}

/// Flatten all files across groups and sort them globally by min value of the
/// sort key. Used to seed the [`SharedWorkSource`](crate::file_stream::work_source::SharedWorkSource)
/// queue in the Inexact sort-pushdown case so workers pull files in globally
/// best-first order — e.g., lowest-min first for ASC, which after the
/// reverse-scan flip is highest-max first for DESC TopK. Tightens the TopK
/// dynamic filter threshold faster across the whole scan.
///
/// Returns `None` when stats are unusable (e.g., any file missing statistics,
/// or there are no files). Callers should fall back to their existing flat
/// order in that case.
pub(crate) fn sort_files_globally_by_statistics(
file_groups: &[FileGroup],
sort_order: &LexOrdering,
projected_schema: &SchemaRef,
projection_indices: Option<&[usize]>,
) -> Option<Vec<PartitionedFile>> {
let files: Vec<&PartitionedFile> =
file_groups.iter().flat_map(FileGroup::iter).collect();
if files.len() <= 1 {
return None;
}

let statistics = match MinMaxStatistics::new_from_files(
sort_order,
projected_schema,
projection_indices,
files.iter().copied(),
) {
Ok(stats) => stats,
Err(e) => {
log::trace!(
"Cannot globally sort files by statistics: {e}. Falling back to flat order."
);
return None;
}
};

let sorted_indices = statistics.min_values_sorted();
let already_sorted = sorted_indices
.iter()
.enumerate()
.all(|(pos, (idx, _))| pos == *idx);
if already_sorted {
return None;
}

Some(
sorted_indices
.iter()
.map(|(idx, _)| files[*idx].clone())
.collect(),
)
}

/// Check if any file in any group has nulls in the sort columns.
pub(crate) fn any_file_has_nulls_in_sort_columns(
file_groups: &[FileGroup],
Expand Down
Loading
Loading