Skip to content

Commit 057e7bc

Browse files
committed
Updates.
1 parent 6be9fb1 commit 057e7bc

File tree

98 files changed

+998
-871
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+998
-871
lines changed

Cargo.lock

Lines changed: 333 additions & 288 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/examples/expr_api.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,8 @@ fn simplify_demo() -> Result<()> {
179179
// expressions, such as the current time (to evaluate `now()`
180180
// correctly)
181181
let props = ExecutionProps::new();
182-
let config_options = ConfigOptions::default();
183-
let context = SimplifyContext::new(&props, &config_options).with_schema(schema);
182+
let config_options = ConfigOptions::default_singleton_arc();
183+
let context = SimplifyContext::new(&props, config_options).with_schema(schema);
184184
let simplifier = ExprSimplifier::new(context);
185185

186186
// And then call the simplify_expr function:
@@ -196,7 +196,7 @@ fn simplify_demo() -> Result<()> {
196196
// here are some other examples of what DataFusion is capable of
197197
let schema = Schema::new(vec![make_field("i", DataType::Int64)]).to_dfschema_ref()?;
198198
let context =
199-
SimplifyContext::new(&props, &config_options).with_schema(schema.clone());
199+
SimplifyContext::new(&props, config_options).with_schema(schema.clone());
200200
let simplifier = ExprSimplifier::new(context);
201201

202202
// basic arithmetic simplification
@@ -534,8 +534,8 @@ fn type_coercion_demo() -> Result<()> {
534534

535535
// Evaluation with an expression that has not been type coerced cannot succeed.
536536
let props = ExecutionProps::default();
537-
let config_options = ConfigOptions::default();
538-
let physical_expr = create_physical_expr(&expr, &df_schema, &props, &config_options)?;
537+
let config_options = ConfigOptions::default_singleton_arc();
538+
let physical_expr = create_physical_expr(&expr, &df_schema, &props, config_options)?;
539539
let e = physical_expr.evaluate(&batch).unwrap_err();
540540
assert!(e
541541
.find_root()
@@ -548,12 +548,12 @@ fn type_coercion_demo() -> Result<()> {
548548
assert!(physical_expr.evaluate(&batch).is_ok());
549549

550550
// 2. Type coercion with `ExprSimplifier::coerce`.
551-
let context = SimplifyContext::new(&props, &config_options)
551+
let context = SimplifyContext::new(&props, config_options)
552552
.with_schema(Arc::new(df_schema.clone()));
553553
let simplifier = ExprSimplifier::new(context);
554554
let coerced_expr = simplifier.coerce(expr.clone(), &df_schema)?;
555555
let physical_expr =
556-
create_physical_expr(&coerced_expr, &df_schema, &props, &config_options)?;
556+
create_physical_expr(&coerced_expr, &df_schema, &props, config_options)?;
557557
assert!(physical_expr.evaluate(&batch).is_ok());
558558

559559
// 3. Type coercion with `TypeCoercionRewriter`.
@@ -562,7 +562,7 @@ fn type_coercion_demo() -> Result<()> {
562562
.rewrite(&mut TypeCoercionRewriter::new(&df_schema))?
563563
.data;
564564
let physical_expr =
565-
create_physical_expr(&coerced_expr, &df_schema, &props, &config_options)?;
565+
create_physical_expr(&coerced_expr, &df_schema, &props, config_options)?;
566566
assert!(physical_expr.evaluate(&batch).is_ok());
567567

568568
// 4. Apply explicit type coercion by manually rewriting the expression
@@ -587,7 +587,7 @@ fn type_coercion_demo() -> Result<()> {
587587
})?
588588
.data;
589589
let physical_expr =
590-
create_physical_expr(&coerced_expr, &df_schema, &props, &config_options)?;
590+
create_physical_expr(&coerced_expr, &df_schema, &props, config_options)?;
591591
assert!(physical_expr.evaluate(&batch).is_ok());
592592

593593
Ok(())

datafusion-examples/examples/planner_api.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::error::Result;
19+
use datafusion::execution::session_state::SessionStateOptimizerConfig;
1920
use datafusion::logical_expr::{LogicalPlan, PlanType};
2021
use datafusion::physical_plan::{displayable, DisplayFormatType};
2122
use datafusion::physical_planner::DefaultPhysicalPlanner;
@@ -97,17 +98,19 @@ async fn to_physical_plan_step_by_step_demo(
9798
ctx: &SessionContext,
9899
) -> Result<()> {
99100
// First analyze the logical plan
100-
let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
101+
let session_state = ctx.state();
102+
let analyzed_logical_plan = session_state.analyzer().execute_and_check(
101103
input,
102-
ctx.state().config_options(),
104+
session_state.config_options(),
103105
|_, _| (),
104106
)?;
105107
println!("Analyzed logical plan:\n\n{:?}\n\n", analyzed_logical_plan);
106108

107109
// Optimize the analyzed logical plan
108-
let optimized_logical_plan = ctx.state().optimizer().optimize(
110+
let session_optimizer_config = SessionStateOptimizerConfig::new(&session_state);
111+
let optimized_logical_plan = session_state.optimizer().optimize(
109112
analyzed_logical_plan,
110-
&ctx.state(),
113+
&session_optimizer_config,
111114
|_, _| (),
112115
)?;
113116
println!(
@@ -116,10 +119,9 @@ async fn to_physical_plan_step_by_step_demo(
116119
);
117120

118121
// Create the physical plan
119-
let physical_plan = ctx
120-
.state()
122+
let physical_plan = session_state
121123
.query_planner()
122-
.create_physical_plan(&optimized_logical_plan, &ctx.state())
124+
.create_physical_plan(&optimized_logical_plan, &session_state)
123125
.await?;
124126
println!(
125127
"Final physical plan:\n\n{}\n\n",
@@ -139,7 +141,7 @@ async fn to_physical_plan_step_by_step_demo(
139141
// on DefaultPhysicalPlanner. Not all planners will provide this feature.
140142
let planner = DefaultPhysicalPlanner::default();
141143
let physical_plan =
142-
planner.optimize_physical_plan(physical_plan, &ctx.state(), |_, _| {})?;
144+
planner.optimize_physical_plan(physical_plan, &session_state, |_, _| {})?;
143145
println!(
144146
"Optimized physical plan:\n\n{}\n\n",
145147
displayable(physical_plan.as_ref())

datafusion-examples/examples/pruning.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,9 @@ impl PruningStatistics for MyCatalog {
189189
fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate {
190190
let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap();
191191
let props = ExecutionProps::new();
192-
let config_options = ConfigOptions::default();
192+
let config_options = ConfigOptions::default_singleton_arc();
193193
let physical_expr =
194-
create_physical_expr(&expr, &df_schema, &props, &config_options).unwrap();
194+
create_physical_expr(&expr, &df_schema, &props, config_options).unwrap();
195195
PruningPredicate::try_new(physical_expr, schema.clone()).unwrap()
196196
}
197197

datafusion-examples/examples/simple_udtf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ impl TableFunctionImpl for LocalCsvTableFunc {
147147
.map(|expr| {
148148
// try to simplify the expression, so 1+2 becomes 3, for example
149149
let execution_props = ExecutionProps::new();
150-
let config_options = Arc::unwrap_or_clone(self.config_options.clone());
150+
let config_options = Arc::clone(&self.config_options);
151151
let info = SimplifyContext::new(&execution_props, &config_options);
152152
let expr = ExprSimplifier::new(info).simplify(expr.clone())?;
153153

datafusion/catalog-listing/src/helpers.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ use futures::stream::FuturesUnordered;
3838
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
3939
use log::{debug, trace};
4040

41+
use datafusion_common::config::ConfigOptions;
4142
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
4243
use datafusion_common::{Column, DFSchema, DataFusionError};
43-
use datafusion_execution::config::SessionConfig;
4444
use datafusion_expr::execution_props::ExecutionProps;
4545
use datafusion_expr::{Expr, Volatility};
4646
use datafusion_physical_expr::create_physical_expr;
@@ -244,7 +244,7 @@ async fn prune_partitions(
244244
partitions: Vec<Partition>,
245245
filters: &[Expr],
246246
partition_cols: &[(String, DataType)],
247-
config: &SessionConfig,
247+
config_options: &Arc<ConfigOptions>,
248248
) -> Result<Vec<Partition>> {
249249
if filters.is_empty() {
250250
return Ok(partitions);
@@ -291,7 +291,6 @@ async fn prune_partitions(
291291

292292
let batch = RecordBatch::try_new(schema, arrays)?;
293293
let props = ExecutionProps::new();
294-
let config_options = config.options();
295294

296295
// Applies `filter` to `batch` returning `None` on error
297296
let do_filter = |filter| -> Result<ArrayRef> {
@@ -414,6 +413,7 @@ pub async fn pruned_partition_list<'a>(
414413
filters: &'a [Expr],
415414
file_extension: &'a str,
416415
partition_cols: &'a [(String, DataType)],
416+
config_options: &Arc<ConfigOptions>,
417417
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
418418
// if no partition col => simply list all the files
419419
if partition_cols.is_empty() {
@@ -443,7 +443,7 @@ pub async fn pruned_partition_list<'a>(
443443
partitions,
444444
filters,
445445
partition_cols,
446-
ctx.config(),
446+
config_options,
447447
)
448448
.await?;
449449

@@ -613,6 +613,7 @@ mod tests {
613613
&[filter],
614614
".parquet",
615615
&[(String::from("mypartition"), DataType::Utf8)],
616+
&Arc::clone(ConfigOptions::default_singleton_arc()),
616617
)
617618
.await
618619
.expect("partition pruning failed")
@@ -638,6 +639,7 @@ mod tests {
638639
&[filter],
639640
".parquet",
640641
&[(String::from("mypartition"), DataType::Utf8)],
642+
&Arc::clone(ConfigOptions::default_singleton_arc()),
641643
)
642644
.await
643645
.expect("partition pruning failed")
@@ -681,6 +683,7 @@ mod tests {
681683
(String::from("part1"), DataType::Utf8),
682684
(String::from("part2"), DataType::Utf8),
683685
],
686+
&Arc::clone(ConfigOptions::default_singleton_arc()),
684687
)
685688
.await
686689
.expect("partition pruning failed")

datafusion/common/src/config.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@
1717

1818
//! Runtime configuration, via [`ConfigOptions`]
1919
20+
use crate::error::_config_err;
21+
use crate::parsers::CompressionTypeVariant;
22+
use crate::utils::get_available_parallelism;
23+
use crate::{DataFusionError, Result};
2024
use std::any::Any;
2125
use std::collections::{BTreeMap, HashMap};
2226
use std::error::Error;
2327
use std::fmt::{self, Display};
2428
use std::str::FromStr;
25-
26-
use crate::error::_config_err;
27-
use crate::parsers::CompressionTypeVariant;
28-
use crate::utils::get_available_parallelism;
29-
use crate::{DataFusionError, Result};
29+
use std::sync::{Arc, LazyLock};
3030

3131
/// A macro that wraps a configuration struct and automatically derives
3232
/// [`Default`] and [`ConfigField`] for it, allowing it to be used
@@ -772,7 +772,20 @@ impl ConfigField for ConfigOptions {
772772
}
773773
}
774774

775+
static CONFIG_OPTIONS_SINGLETON: LazyLock<Arc<ConfigOptions>> =
776+
LazyLock::new(|| Arc::new(ConfigOptions::default()));
777+
775778
impl ConfigOptions {
779+
/// this is a static singleton to be used for testing only where the default values are sufficient
780+
pub fn default_singleton() -> &'static ConfigOptions {
781+
CONFIG_OPTIONS_SINGLETON.as_ref()
782+
}
783+
784+
/// this is a static singleton to be used for testing only where the default values are sufficient
785+
pub fn default_singleton_arc() -> &'static Arc<ConfigOptions> {
786+
&CONFIG_OPTIONS_SINGLETON
787+
}
788+
776789
/// Creates a new [`ConfigOptions`] with default values
777790
pub fn new() -> Self {
778791
Self::default()

datafusion/core/src/datasource/listing/table.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,8 @@ impl TableProvider for ListingTable {
891891
}
892892

893893
let output_ordering = self.try_create_output_ordering()?;
894+
let config_options = Arc::new(session_state.config_options().clone());
895+
894896
match state
895897
.config_options()
896898
.execution
@@ -924,7 +926,7 @@ impl TableProvider for ListingTable {
924926
&expr,
925927
&table_df_schema,
926928
state.execution_props(),
927-
session_state.config_options(),
929+
&config_options,
928930
)?;
929931
Some(filters)
930932
}
@@ -1022,13 +1024,15 @@ impl TableProvider for ListingTable {
10221024

10231025
// TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here?
10241026
let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
1027+
let config_options = Arc::new(state.config_options().clone());
10251028
let file_list_stream = pruned_partition_list(
10261029
session_state,
10271030
store.as_ref(),
10281031
table_path,
10291032
&[],
10301033
&self.options.file_extension,
10311034
&self.options.table_partition_cols,
1035+
&config_options,
10321036
)
10331037
.await?;
10341038

@@ -1095,6 +1099,7 @@ impl ListingTable {
10951099
return Ok((vec![], Statistics::new_unknown(&self.file_schema)));
10961100
};
10971101
// list files (with partitions)
1102+
let config_options = Arc::new(ctx.config_options().clone());
10981103
let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
10991104
pruned_partition_list(
11001105
ctx,
@@ -1103,6 +1108,7 @@ impl ListingTable {
11031108
filters,
11041109
&self.options.file_extension,
11051110
&self.options.table_partition_cols,
1111+
&config_options,
11061112
)
11071113
}))
11081114
.await?;

datafusion/core/src/datasource/memory.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,8 @@ impl TableProvider for MemTable {
236236

237237
// add sort information if present
238238
let sort_order = self.sort_order.lock();
239+
let config_options = Arc::new(state.config_options().clone());
240+
239241
if !sort_order.is_empty() {
240242
let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
241243

@@ -246,7 +248,7 @@ impl TableProvider for MemTable {
246248
sort_exprs,
247249
&df_schema,
248250
state.execution_props(),
249-
state.config_options(),
251+
&config_options,
250252
)
251253
})
252254
.collect::<Result<Vec<_>>>()?;

datafusion/core/src/execution/context/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1829,6 +1829,7 @@ mod tests {
18291829
use crate::execution::session_state::SessionStateBuilder;
18301830
use crate::physical_planner::PhysicalPlanner;
18311831
use async_trait::async_trait;
1832+
use datafusion_common::config::ConfigOptions;
18321833
use datafusion_expr::planner::TypePlanner;
18331834
use sqlparser::ast;
18341835
use tempfile::TempDir;
@@ -2281,6 +2282,7 @@ mod tests {
22812282
_expr: &Expr,
22822283
_input_dfschema: &DFSchema,
22832284
_session_state: &SessionState,
2285+
_config_options: &Arc<ConfigOptions>,
22842286
) -> Result<Arc<dyn PhysicalExpr>> {
22852287
unimplemented!()
22862288
}

0 commit comments

Comments
 (0)