Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 26 additions & 41 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,34 @@ url = "2.5.7"
uuid = "1.23"
zstd = { version = "0.13", default-features = false }

# Override arrow / parquet to the `adaptive-strategy-swap` branch on
# pydantic's fork of arrow-rs, which adds the `swap_strategy` API on
# `ParquetPushDecoder` that the in-decoder adaptive filter scheduling
# depends on.
#
# The full set of arrow-rs workspace crates is listed so transitive
# deps (e.g. `arrow-cast` pulled in via `arrow`) resolve to the patched
# version and we don't link two copies into one binary.
#
# Branch: https://github.com/pydantic/arrow-rs/tree/adaptive-strategy-swap
[patch.crates-io]
arrow = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-arith = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-array = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-buffer = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-cast = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-csv = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-data = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-flight = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-ipc = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-json = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-ord = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-row = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-schema = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-select = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
arrow-string = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }
parquet = { git = "https://github.com/pydantic/arrow-rs.git", branch = "adaptive-strategy-swap" }

[workspace.lints.clippy]
# Detects large stack-allocated futures that may cause stack overflow crashes (see threshold in clippy.toml)
large_futures = "warn"
Expand Down
10 changes: 10 additions & 0 deletions datafusion-examples/examples/data_io/json_shredding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ pub async fn json_shredding() -> Result<()> {
// Set up query execution
let mut cfg = SessionConfig::new();
cfg.options_mut().execution.parquet.pushdown_filters = true;
// Force every filter to row-level so the example's
// `pushdown_rows_pruned=1` assertion is deterministic. The default
// adaptive scheduler keeps small-file filters on the post-scan path
// (via the byte-ratio heuristic), where `pushdown_rows_pruned` stays
// 0; setting `filter_pushdown_min_bytes_per_sec = 0` disables that
// heuristic.
cfg.options_mut()
.execution
.parquet
.filter_pushdown_min_bytes_per_sec = 0.0;
let ctx = SessionContext::new_with_config(cfg);
ctx.runtime_env().register_object_store(
ObjectStoreUrl::parse("memory://")?.as_ref(),
Expand Down
23 changes: 23 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,29 @@ config_namespace! {
/// parquet reader setting. 0 means no caching.
pub max_predicate_cache_size: Option<usize>, default = None

/// (reading) Minimum throughput, in bytes per second, that an adaptive
/// row-level filter must sustain to remain at row-level. Filters that
/// drop below this threshold (with statistical confidence — see
/// `filter_confidence_z`) are demoted to post-scan, or dropped entirely
/// if they were optional (e.g. a hash-join build-side dynamic filter).
/// Set to `0` to force every filter to row-level (skip the threshold
/// check); set to `f64::INFINITY` to keep every filter post-scan.
pub filter_pushdown_min_bytes_per_sec: f64, default = 100.0 * 1024.0 * 1024.0

/// (reading) Initial-placement heuristic for adaptive filters: when a
/// filter is first observed, place it at row-level if its column bytes
/// are this fraction or less of the total projection's column bytes.
/// Above this ratio, the filter starts as post-scan and only gets
/// promoted later if measured throughput crosses
/// `filter_pushdown_min_bytes_per_sec`.
pub filter_collecting_byte_ratio_threshold: f64, default = 0.20

/// (reading) Z-score for the one-sided confidence interval the adaptive
/// filter scheduler uses when promoting / demoting / dropping filters.
/// Default `2.0` (≈ 97.5%) keeps strategy moves conservative; lower the
/// value for snappier adaptation, raise it for more stable placements.
pub filter_confidence_z: f64, default = 2.0

// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

Expand Down
13 changes: 13 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ impl ParquetOptions {
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
max_predicate_cache_size: _,
// Read-time adaptive filter knobs; not used for writer props.
filter_pushdown_min_bytes_per_sec: _,
filter_collecting_byte_ratio_threshold: _,
filter_confidence_z: _,
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -483,6 +487,10 @@ mod tests {
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
max_predicate_cache_size: defaults.max_predicate_cache_size,
filter_pushdown_min_bytes_per_sec: defaults.filter_pushdown_min_bytes_per_sec,
filter_collecting_byte_ratio_threshold: defaults
.filter_collecting_byte_ratio_threshold,
filter_confidence_z: defaults.filter_confidence_z,
use_content_defined_chunking: defaults.use_content_defined_chunking.clone(),
}
}
Expand Down Expand Up @@ -600,6 +608,11 @@ mod tests {
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
coerce_int96: None,
filter_pushdown_min_bytes_per_sec: global_options_defaults
.filter_pushdown_min_bytes_per_sec,
filter_collecting_byte_ratio_threshold: global_options_defaults
.filter_collecting_byte_ratio_threshold,
filter_confidence_z: global_options_defaults.filter_confidence_z,
use_content_defined_chunking: props.content_defined_chunking().map(|c| {
CdcOptions {
min_chunk_size: c.min_chunk_size,
Expand Down
16 changes: 11 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,19 @@ mod tests {
source = source.with_predicate(predicate);
}

// The adaptive selectivity tracker subsumes the static
// `reorder_filters` flag. To keep these row-filter-pushdown
// assertions deterministic regardless of the byte-ratio
// heuristic, force every filter to row-level by setting
// `filter_pushdown_min_bytes_per_sec = 0` (the
// "always-row-level" sentinel). The promote/demote behavior
// exercised by other tests is irrelevant here.
if self.pushdown_predicate {
source = source
.with_pushdown_filters(true)
.with_reorder_filters(true);
} else {
source = source.with_pushdown_filters(false);
let mut opts = TableParquetOptions::default();
opts.global.filter_pushdown_min_bytes_per_sec = 0.0;
source = source.with_table_parquet_options(opts);
}
source = source.with_pushdown_filters(self.pushdown_predicate);

if self.page_index_predicate {
source = source.with_enable_page_index(true);
Expand Down
4 changes: 4 additions & 0 deletions datafusion/datasource-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,7 @@ harness = false
[[bench]]
name = "parquet_struct_filter_pushdown"
harness = false

[[bench]]
name = "selectivity_tracker"
harness = false
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use arrow::array::{
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use criterion::{Criterion, Throughput, criterion_group, criterion_main};
use datafusion_common::ScalarValue;
use datafusion_datasource_parquet::selectivity::SelectivityTracker;
use datafusion_datasource_parquet::{ParquetFileMetrics, build_row_filter};
use datafusion_expr::{Expr, col};
use datafusion_functions_nested::expr_fn::array_has;
Expand Down Expand Up @@ -115,9 +116,17 @@ fn scan_with_predicate(
let file_metrics = ParquetFileMetrics::new(0, &path.display().to_string(), &metrics);

let builder = if pushdown {
if let Some(row_filter) =
build_row_filter(predicate, file_schema, &metadata, false, &file_metrics)?
{
let tracker = Arc::new(SelectivityTracker::new());
let filters = vec![(0usize, Arc::clone(predicate))];
let (maybe_row_filter, _unbuildable) = build_row_filter(
&filters,
file_schema,
&metadata,
0,
&tracker,
&file_metrics,
)?;
if let Some(row_filter) = maybe_row_filter {
builder.with_row_filter(row_filter)
} else {
builder
Expand Down
Loading
Loading