diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 16ddf2a6a5f8..32d321a6af4d 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -21,7 +21,7 @@ use std::path::Path; use std::process::ExitCode; use std::sync::{Arc, LazyLock}; -use datafusion::error::{DataFusionError, Result}; +use datafusion::error::Result; use datafusion::execution::context::SessionConfig; use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool}; use datafusion::execution::runtime_env::RuntimeEnvBuilder; @@ -38,7 +38,7 @@ use datafusion_cli::{ }; use clap::Parser; -use datafusion::common::config_err; +use datafusion::common::{config_err, external_datafusion_err}; use datafusion::config::ConfigOptions; use datafusion::execution::disk_manager::DiskManagerConfig; use mimalloc::MiMalloc; @@ -233,7 +233,7 @@ async fn main_inner() -> Result<()> { // TODO maybe we can have thiserror for cli but for now let's keep it simple return exec::exec_from_repl(&ctx, &mut print_options) .await - .map_err(|e| DataFusionError::External(Box::new(e))); + .map_err(|e| external_datafusion_err!(e)); } if !files.is_empty() { @@ -381,6 +381,7 @@ pub fn extract_disk_limit(size: &str) -> Result { mod tests { use super::*; use datafusion::common::test_util::batches_to_string; + use datafusion::common::DataFusionError; use insta::assert_snapshot; fn assert_conversion(input: &str, expected: Result) { diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 56d787b0fe08..e4cfa2386f2d 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -24,8 +24,8 @@ use crate::print_format::PrintFormat; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; +use datafusion::common::external_err; use datafusion::common::instant::Instant; -use datafusion::common::DataFusionError; use datafusion::error::Result; use datafusion::physical_plan::RecordBatchStream; @@ -143,9 +143,7 @@ impl PrintOptions { format_options: &FormatOptions, ) -> Result<()> { if self.format == PrintFormat::Table { - return Err(DataFusionError::External( - "PrintFormat::Table is not implemented".to_string().into(), - )); + return external_err!("PrintFormat::Table is not implemented"); }; let stdout = std::io::stdout(); diff --git a/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs b/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs index 6e376ca866e8..0311ed774411 100644 --- a/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs +++ b/datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use datafusion::{ + common::external_datafusion_err, error::{DataFusionError, Result}, prelude::SessionContext, }; @@ -32,12 +33,12 @@ async fn main() -> Result<()> { // so you will need to change the approach here based on your use case. let target: &std::path::Path = "../../../../target/".as_ref(); let library_path = compute_library_path::(target) - .map_err(|e| DataFusionError::External(Box::new(e)))?; + .map_err(|e| external_datafusion_err!(e))?; // Load the module let table_provider_module = TableProviderModuleRef::load_from_directory(&library_path) - .map_err(|e| DataFusionError::External(Box::new(e)))?; + .map_err(|e| external_datafusion_err!(e))?; // By calling the code below, the table provided will be created within // the module's code. diff --git a/datafusion-examples/examples/sql_query.rs b/datafusion-examples/examples/sql_query.rs index 0ac203cfb7e7..ce767bdceb0e 100644 --- a/datafusion-examples/examples/sql_query.rs +++ b/datafusion-examples/examples/sql_query.rs @@ -18,11 +18,13 @@ use datafusion::arrow::array::{UInt64Array, UInt8Array}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::common::{assert_batches_eq, exec_datafusion_err}; +use datafusion::common::{ + assert_batches_eq, exec_datafusion_err, external_datafusion_err, +}; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::datasource::MemTable; -use datafusion::error::{DataFusionError, Result}; +use datafusion::error::Result; use datafusion::prelude::*; use object_store::local::LocalFileSystem; use std::path::Path; @@ -168,8 +170,7 @@ async fn query_parquet() -> Result<()> { let local_fs = Arc::new(LocalFileSystem::default()); - let u = url::Url::parse("file://./") - .map_err(|e| DataFusionError::External(Box::new(e)))?; + let u = url::Url::parse("file://./").map_err(|e| external_datafusion_err!(e))?; ctx.register_object_store(&u, local_fs); // Register a listing table - this will use all files in the directory as data sources diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs index fbfab513229e..9f136022f7aa 100644 --- a/datafusion/catalog/src/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -28,7 +28,10 @@ use std::sync::Arc; use crate::{Session, TableProvider, TableProviderFactory}; use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter}; use arrow::datatypes::SchemaRef; -use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result}; +use datafusion_common::{ + config_err, execution_join_datafusion_err, plan_err, Constraints, DataFusionError, + Result, +}; use datafusion_common_runtime::SpawnedTask; use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; @@ -440,6 +443,6 @@ impl DataSink for StreamWrite { write_task .join_unwind() .await - .map_err(DataFusionError::ExecutionJoin)? + .map_err(|e| execution_join_datafusion_err!(e))? } } diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 59283114e3a9..fb7b032fd472 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -20,7 +20,7 @@ use crate::error::_config_err; use crate::parsers::CompressionTypeVariant; use crate::utils::get_available_parallelism; -use crate::{DataFusionError, Result}; +use crate::{external_datafusion_err, DataFusionError, Result}; use std::any::Any; use std::collections::{BTreeMap, HashMap}; use std::error::Error; @@ -1186,7 +1186,7 @@ where input, std::any::type_name::() ), - Box::new(DataFusionError::External(Box::new(e))), + Box::new(external_datafusion_err!(e)), ) }) } diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index b4a537fdce7e..34ce2a91f1ad 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -118,7 +118,9 @@ pub enum DataFusionError { /// [`JoinError`] during execution of the query. /// /// This error can't occur for unjoined tasks, such as execution shutdown. - ExecutionJoin(JoinError), + /// + /// 2nd argument is for optional backtrace + ExecutionJoin(JoinError, Option), /// Error when resources (such as memory of scratch disk space) are exhausted. /// /// This error is thrown when a consumer cannot acquire additional memory @@ -280,6 +282,12 @@ impl From for DataFusionError { } } +impl From for DataFusionError { + fn from(e: JoinError) -> Self { + DataFusionError::ExecutionJoin(e, None) + } +} + impl From for ArrowError { fn from(e: DataFusionError) -> Self { match e { @@ -376,7 +384,7 @@ impl Error for DataFusionError { DataFusionError::Plan(_) => None, DataFusionError::SchemaError(e, _) => Some(e), DataFusionError::Execution(_) => None, - DataFusionError::ExecutionJoin(e) => Some(e), + DataFusionError::ExecutionJoin(e, _) => Some(e), DataFusionError::ResourcesExhausted(_) => None, DataFusionError::External(e) => Some(e.as_ref()), DataFusionError::Context(_, e) => Some(e.as_ref()), @@ -508,7 +516,7 @@ impl DataFusionError { } DataFusionError::SchemaError(_, _) => "Schema error: ", DataFusionError::Execution(_) => "Execution error: ", - DataFusionError::ExecutionJoin(_) => "ExecutionJoin error: ", + DataFusionError::ExecutionJoin(_, _) => "ExecutionJoin error: ", DataFusionError::ResourcesExhausted(_) => { "Resources exhausted: " } @@ -552,7 +560,10 @@ impl DataFusionError { Cow::Owned(format!("{desc}{backtrace}")) } DataFusionError::Execution(ref desc) => Cow::Owned(desc.to_string()), - DataFusionError::ExecutionJoin(ref desc) => Cow::Owned(desc.to_string()), + DataFusionError::ExecutionJoin(ref desc, ref backtrace) => { + let backtrace = backtrace.clone().unwrap_or_else(|| "".to_owned()); + Cow::Owned(format!("{desc}{backtrace}")) + } DataFusionError::ResourcesExhausted(ref desc) => Cow::Owned(desc.to_string()), DataFusionError::External(ref desc) => Cow::Owned(desc.to_string()), #[cfg(feature = "object_store")] @@ -900,8 +911,58 @@ macro_rules! schema_err { let err = err.with_diagnostic($DIAG); )? Err(err) - } - }; + }}; +} + +// Exposes a macro to create `DataFusionError::External` with optional backtrace +#[macro_export] +macro_rules! external_datafusion_err { + ($CONVERTIBLE_TO_ERR:expr $(; diagnostic = $DIAG:expr)?) => {{ + let err = $crate::error::DataFusionError::External($crate::error::GenericError::from($CONVERTIBLE_TO_ERR)).context($crate::error::DataFusionError::get_back_trace()); + $( + let err = err.with_diagnostic($DIAG); + )? + err + }}; +} + +// Exposes a macro to create `Err(DataFusionError::External)` with optional backtrace +#[macro_export] +macro_rules! external_err { + ($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{ + let err = $crate::external_datafusion_err!($ERR); + $( + let err = err.with_diagnostic($DIAG); + )? + Err(err) + }}; +} + +// Exposes a macro to create `DataFusionError::ExecutionJoin` with optional backtrace +#[macro_export] +macro_rules! execution_join_datafusion_err { + ($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{ + let err = $crate::error::DataFusionError::ExecutionJoin( + $ERR, + Some($crate::error::DataFusionError::get_back_trace()) + ); + $( + let err = err.with_diagnostic($DIAG); + )? + err + }}; +} + +// Exposes a macro to create `Err(DataFusionError::ExecutionJoin)` with optional backtrace +#[macro_export] +macro_rules! execution_join_err { + ($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{ + let err = $crate::execution_join_datafusion_err!($ERR); + $( + let err = err.with_diagnostic($DIAG); + )? + Err(err) + }}; } // To avoid compiler error when using macro in the same crate: diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index b620ff62d9a6..f62b682aae52 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -44,7 +44,8 @@ use arrow::ipc::{root_as_message, CompressionType}; use datafusion_catalog::Session; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ - not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION, + execution_join_datafusion_err, not_impl_err, DataFusionError, GetExt, Statistics, + DEFAULT_ARROW_EXTENSION, }; use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_datasource::display::FileGroupDisplay; @@ -294,7 +295,7 @@ impl FileSink for ArrowFileSink { demux_task .join_unwind() .await - .map_err(DataFusionError::ExecutionJoin)??; + .map_err(|e| execution_join_datafusion_err!(e))??; Ok(row_count as u64) } } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 8719a16f4919..baba656cc0e9 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -38,7 +38,7 @@ use crate::test_util::{aggr_test_schema, arrow_test_data}; use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::DataFusionError; +use datafusion_common::external_datafusion_err; use datafusion_datasource::source::DataSourceExec; #[cfg(feature = "compression")] @@ -135,7 +135,7 @@ pub fn partitioned_file_groups( #[cfg(feature = "compression")] FileCompressionType::ZSTD => { let encoder = ZstdEncoder::new(file, 0) - .map_err(|e| DataFusionError::External(Box::new(e)))? + .map_err(|e| external_datafusion_err!(e))? .auto_finish(); Box::new(encoder) } diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 253bd8872dee..543c466e7d05 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -42,8 +42,9 @@ use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ - internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics, - DataFusionError, GetExt, HashSet, Result, DEFAULT_PARQUET_EXTENSION, + execution_join_datafusion_err, internal_datafusion_err, internal_err, not_impl_err, + ColumnStatistics, DataFusionError, GetExt, HashSet, Result, + DEFAULT_PARQUET_EXTENSION, }; use datafusion_common::{HashMap, Statistics}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; @@ -1359,7 +1360,7 @@ impl FileSink for ParquetSink { demux_task .join_unwind() .await - .map_err(DataFusionError::ExecutionJoin)??; + .map_err(|e| execution_join_datafusion_err!(e))??; Ok(row_count as u64) } @@ -1487,7 +1488,7 @@ fn spawn_rg_join_and_finalize_task( let (writer, _col_reservation) = task .join_unwind() .await - .map_err(DataFusionError::ExecutionJoin)??; + .map_err(|e| execution_join_datafusion_err!(e))??; let encoded_size = writer.get_estimated_total_bytes(); rg_reservation.grow(encoded_size); finalized_rg.push(writer.close()?); @@ -1624,7 +1625,7 @@ async fn concatenate_parallel_row_groups( let result = task.join_unwind().await; let mut rg_out = parquet_writer.next_row_group()?; let (serialized_columns, mut rg_reservation, _cnt) = - result.map_err(DataFusionError::ExecutionJoin)??; + result.map_err(|e| execution_join_datafusion_err!(e))??; for chunk in serialized_columns { chunk.append_to_row_group(&mut rg_out)?; rg_reservation.free(); @@ -1691,7 +1692,7 @@ async fn output_single_parquet_file_parallelized( launch_serialization_task .join_unwind() .await - .map_err(DataFusionError::ExecutionJoin)??; + .map_err(|e| execution_join_datafusion_err!(e))??; Ok(file_metadata) } diff --git a/datafusion/datasource/src/file_compression_type.rs b/datafusion/datasource/src/file_compression_type.rs index 7cc3142564e9..dc5f03a91f01 100644 --- a/datafusion/datasource/src/file_compression_type.rs +++ b/datafusion/datasource/src/file_compression_type.rs @@ -19,7 +19,7 @@ use std::str::FromStr; -use datafusion_common::error::{DataFusionError, Result}; +use datafusion_common::{external_datafusion_err, DataFusionError, Result}; use datafusion_common::parsers::CompressionTypeVariant::{self, *}; use datafusion_common::GetExt; @@ -231,7 +231,7 @@ impl FileCompressionType { #[cfg(feature = "compression")] ZSTD => match ZstdDecoder::new(r) { Ok(decoder) => Box::new(decoder), - Err(e) => return Err(DataFusionError::External(Box::new(e))), + Err(e) => return Err(external_datafusion_err!(e)), }, #[cfg(not(feature = "compression"))] GZIP | BZIP2 | XZ | ZSTD => { diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index bddfdbcc06d1..6d7eb3fdbfe5 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{external_datafusion_err, DataFusionError, Result}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_session::Session; @@ -112,7 +112,7 @@ impl ListingTableUrl { Ok(url) => Self::try_new(url, None), #[cfg(not(target_arch = "wasm32"))] Err(url::ParseError::RelativeUrlWithoutBase) => Self::parse_path(s), - Err(e) => Err(DataFusionError::External(Box::new(e))), + Err(e) => Err(external_datafusion_err!(e)), } } @@ -121,17 +121,14 @@ impl ListingTableUrl { fn parse_path(s: &str) -> Result { let (path, glob) = match split_glob_expression(s) { Some((prefix, glob)) => { - let glob = Pattern::new(glob) - .map_err(|e| DataFusionError::External(Box::new(e)))?; + let glob = Pattern::new(glob).map_err(|e| external_datafusion_err!(e))?; (prefix, Some(glob)) } None => (s, None), }; let url = url_from_filesystem_path(path).ok_or_else(|| { - DataFusionError::External( - format!("Failed to convert path to URL: {path}").into(), - ) + external_datafusion_err!(format!("Failed to convert path to URL: {path}")) })?; Self::try_new(url, glob) diff --git a/datafusion/datasource/src/write/orchestration.rs b/datafusion/datasource/src/write/orchestration.rs index a09509ac5862..d9ecbf85b421 100644 --- a/datafusion/datasource/src/write/orchestration.rs +++ b/datafusion/datasource/src/write/orchestration.rs @@ -27,7 +27,9 @@ use crate::file_compression_type::FileCompressionType; use datafusion_common::error::Result; use arrow::array::RecordBatch; -use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError}; +use datafusion_common::{ + execution_join_datafusion_err, internal_datafusion_err, internal_err, DataFusionError, +}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_execution::TaskContext; @@ -285,8 +287,8 @@ pub async fn spawn_writer_tasks_and_join( write_coordinator_task.join_unwind(), demux_task.join_unwind() ); - r1.map_err(DataFusionError::ExecutionJoin)??; - r2.map_err(DataFusionError::ExecutionJoin)??; + r1.map_err(|e| execution_join_datafusion_err!(e))??; + r2.map_err(|e| execution_join_datafusion_err!(e))??; // Return total row count: rx_row_cnt.await.map_err(|_| { diff --git a/datafusion/execution/src/object_store.rs b/datafusion/execution/src/object_store.rs index cd75c9f3c49e..9da9555dc414 100644 --- a/datafusion/execution/src/object_store.rs +++ b/datafusion/execution/src/object_store.rs @@ -20,7 +20,7 @@ //! and query data inside these systems. use dashmap::DashMap; -use datafusion_common::{exec_err, DataFusionError, Result}; +use datafusion_common::{exec_err, external_datafusion_err, DataFusionError, Result}; #[cfg(not(target_arch = "wasm32"))] use object_store::local::LocalFileSystem; use object_store::ObjectStore; @@ -55,7 +55,7 @@ impl ObjectStoreUrl { /// ``` pub fn parse(s: impl AsRef) -> Result { let mut parsed = - Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?; + Url::parse(s.as_ref()).map_err(|e| external_datafusion_err!(e))?; let remaining = &parsed[url::Position::BeforePath..]; if !remaining.is_empty() && remaining != "/" { diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 73c1a5304f82..555479be6f5e 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -31,7 +31,7 @@ use crate::dml::CopyTo; use arrow::datatypes::Schema; use datafusion_common::display::GraphvizBuilder; use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor}; -use datafusion_common::{Column, DataFusionError}; +use datafusion_common::{external_datafusion_err, Column, DataFusionError}; use serde_json::json; /// Formats plans with a single line per node. For example: @@ -711,7 +711,7 @@ impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> { self.f, "{}", serde_json::to_string_pretty(&plan) - .map_err(|e| DataFusionError::External(Box::new(e)))? + .map_err(|e| external_datafusion_err!(e))? )?; } diff --git a/datafusion/ffi/src/plan_properties.rs b/datafusion/ffi/src/plan_properties.rs index 5c878fa4be79..18d53f87377b 100644 --- a/datafusion/ffi/src/plan_properties.rs +++ b/datafusion/ffi/src/plan_properties.rs @@ -26,6 +26,7 @@ use abi_stable::{ }; use arrow::datatypes::SchemaRef; use datafusion::{ + common::external_datafusion_err, error::{DataFusionError, Result}, physical_expr::EquivalenceProperties, physical_plan::{ @@ -187,7 +188,7 @@ impl TryFrom for PlanProperties { let proto_output_ordering = PhysicalSortExprNodeCollection::decode(df_result!(ffi_orderings)?.as_ref()) - .map_err(|e| DataFusionError::External(Box::new(e)))?; + .map_err(|e| external_datafusion_err!(e))?; let orderings = Some(parse_physical_sort_exprs( &proto_output_ordering.physical_sort_expr_nodes, &default_ctx, @@ -197,9 +198,8 @@ impl TryFrom for PlanProperties { let partitioning_vec = unsafe { df_result!((ffi_props.output_partitioning)(&ffi_props))? }; - let proto_output_partitioning = - Partitioning::decode(partitioning_vec.as_ref()) - .map_err(|e| DataFusionError::External(Box::new(e)))?; + let proto_output_partitioning = Partitioning::decode(partitioning_vec.as_ref()) + .map_err(|e| external_datafusion_err!(e))?; let partitioning = parse_protobuf_partitioning( Some(&proto_output_partitioning), &default_ctx, diff --git a/datafusion/ffi/src/tests/utils.rs b/datafusion/ffi/src/tests/utils.rs index 6465b17d9b60..ef0277644158 100644 --- a/datafusion/ffi/src/tests/utils.rs +++ b/datafusion/ffi/src/tests/utils.rs @@ -17,7 +17,7 @@ use crate::tests::ForeignLibraryModuleRef; use abi_stable::library::RootModule; -use datafusion::error::{DataFusionError, Result}; +use datafusion::common::{external_datafusion_err, Result}; use std::path::Path; /// Compute the path to the library. It would be preferable to simply use @@ -69,12 +69,12 @@ pub fn get_module() -> Result { // let target: &std::path::Path = "../../../../target/".as_ref(); let library_path = compute_library_path::(target_dir.as_path()) - .map_err(|e| DataFusionError::External(Box::new(e)))? + .map_err(|e| external_datafusion_err!(e))? .join("deps"); // Load the module let module = ForeignLibraryModuleRef::load_from_directory(&library_path) - .map_err(|e| DataFusionError::External(Box::new(e)))?; + .map_err(|e| external_datafusion_err!(e))?; assert_eq!( module diff --git a/datafusion/functions/src/regex/regexpreplace.rs b/datafusion/functions/src/regex/regexpreplace.rs index 3a83564ff11f..260888b52bf1 100644 --- a/datafusion/functions/src/regex/regexpreplace.rs +++ b/datafusion/functions/src/regex/regexpreplace.rs @@ -26,11 +26,11 @@ use arrow::array::{ArrayAccessor, StringViewArray}; use arrow::datatypes::DataType; use datafusion_common::cast::as_string_view_array; use datafusion_common::exec_err; +use datafusion_common::external_datafusion_err; +use datafusion_common::external_err; use datafusion_common::plan_err; use datafusion_common::ScalarValue; -use datafusion_common::{ - cast::as_generic_string_array, internal_err, DataFusionError, Result, -}; +use datafusion_common::{cast::as_generic_string_array, internal_err, Result}; use datafusion_expr::function::Hint; use datafusion_expr::ColumnarValue; use datafusion_expr::TypeSignature; @@ -278,7 +278,7 @@ where Ok(patterns.get(pattern).unwrap()) } Err(err) => { - Err(DataFusionError::External(Box::new(err))) + external_err!(err) } }, }; @@ -344,7 +344,7 @@ where Ok(patterns.get(&pattern).unwrap()) } Err(err) => { - Err(DataFusionError::External(Box::new(err))) + external_err!(err) } }, }; @@ -453,8 +453,7 @@ fn _regexp_replace_static_pattern_replace( None => (pattern.to_string(), 1), }; - let re = - Regex::new(&pattern).map_err(|err| DataFusionError::External(Box::new(err)))?; + let re = Regex::new(&pattern).map_err(|err| external_datafusion_err!(err))?; // Replaces the posix groups in the replacement string // with rust ones. diff --git a/datafusion/optimizer/src/simplify_expressions/regex.rs b/datafusion/optimizer/src/simplify_expressions/regex.rs index ec6485bf4b44..eb9d3b62da26 100644 --- a/datafusion/optimizer/src/simplify_expressions/regex.rs +++ b/datafusion/optimizer/src/simplify_expressions/regex.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_common::{external_datafusion_err, Result, ScalarValue}; use datafusion_expr::{lit, BinaryExpr, Expr, Like, Operator}; use regex_syntax::hir::{Capture, Hir, HirKind, Literal, Look}; @@ -79,10 +79,7 @@ pub fn simplify_regex_expr( } Err(e) => { // error out early since the execution may fail anyways - return Err(DataFusionError::Context( - "Invalid regex".to_owned(), - Box::new(DataFusionError::External(Box::new(e))), - )); + return Err(external_datafusion_err!(e).context("Invalid regex")); } } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index ee5be01e2914..54c26b78e43b 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -45,7 +45,7 @@ use arrow::compute::take_arrays; use arrow::datatypes::{SchemaRef, UInt32Type}; use datafusion_common::config::ConfigOptions; use datafusion_common::utils::transpose; -use datafusion_common::HashMap; +use datafusion_common::{external_datafusion_err, HashMap}; use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::MemoryConsumer; @@ -948,10 +948,8 @@ impl RepartitionExec { let e = Arc::new(e); for (_, tx) in txs { - let err = Err(DataFusionError::Context( - "Join Error".to_string(), - Box::new(DataFusionError::External(Box::new(Arc::clone(&e)))), - )); + let err = Err(external_datafusion_err!(Arc::clone(&e)) + .context("Join Error".to_string())); tx.send(Some(err)).await.ok(); } } diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 1101616a4106..cfc8ae39a16c 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -33,7 +33,7 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow::ipc::{reader::StreamReader, writer::StreamWriter}; use arrow::record_batch::RecordBatch; -use datafusion_common::{exec_datafusion_err, DataFusionError, HashSet, Result}; +use datafusion_common::{exec_datafusion_err, external_err, HashSet, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::RecordBatchStream; @@ -114,7 +114,7 @@ impl SpillReaderStream { SpillReaderStreamState::ReadInProgress(task) => { let result = futures::ready!(task.poll_unpin(cx)) - .unwrap_or_else(|err| Err(DataFusionError::External(Box::new(err)))); + .unwrap_or_else(|err| external_err!(err)); match result { Ok((reader, batch)) => { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index ad4c695b9ef1..18befd05b1e7 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -97,7 +97,8 @@ use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ - internal_err, not_impl_err, DataFusionError, Result, UnnestOptions, + external_datafusion_err, internal_err, not_impl_err, DataFusionError, Result, + UnnestOptions, }; use datafusion_expr::{ Accumulator, AccumulatorFactoryFunction, AggregateUDF, ColumnarValue, ScalarUDF, @@ -1597,7 +1598,7 @@ async fn roundtrip_coalesce() -> Result<()> { &DefaultPhysicalExtensionCodec {}, )?; let node = PhysicalPlanNode::decode(node.encode_to_vec().as_slice()) - .map_err(|e| DataFusionError::External(Box::new(e)))?; + .map_err(|e| external_datafusion_err!(e))?; let restored = node.try_into_physical_plan( &ctx, ctx.runtime_env().as_ref(), diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 4209734138f1..8eda6f3bd647 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -39,7 +39,7 @@ use crate::unparser::extension_unparser::{ use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs}; use crate::utils::UNNEST_PLACEHOLDER; use datafusion_common::{ - internal_err, not_impl_err, + external_datafusion_err, internal_err, not_impl_err, tree_node::{TransformedResult, TreeNode}, Column, DataFusionError, Result, ScalarValue, TableReference, }; @@ -1382,7 +1382,7 @@ impl Unparser<'_> { impl From for DataFusionError { fn from(e: BuilderError) -> Self { - DataFusionError::External(Box::new(e)) + external_datafusion_err!(e) } } diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index f8a4ec666572..4b285df38afc 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -18,7 +18,9 @@ use clap::Parser; use datafusion::common::instant::Instant; use datafusion::common::utils::get_available_parallelism; -use datafusion::common::{exec_err, DataFusionError, Result}; +use datafusion::common::{ + exec_err, external_datafusion_err, external_err, DataFusionError, Result, +}; use datafusion_sqllogictest::{ df_value_validator, read_dir_recursive, setup_scratch_dir, value_normalizer, DataFusion, TestContext, @@ -175,7 +177,7 @@ async fn run_tests() -> Result<()> { // Filter out any Ok() leaving only the DataFusionErrors futures::stream::iter(match result { // Tokio panic error - Err(e) => Some(DataFusionError::External(Box::new(e))), + Err(e) => Some(external_datafusion_err!(e)), Ok(thread_result) => thread_result.err(), }) }) @@ -245,8 +247,7 @@ async fn run_file_in_runner>( mut runner: sqllogictest::Runner, ) -> Result<()> { let path = path.canonicalize()?; - let records = - parse_file(&path).map_err(|e| DataFusionError::External(Box::new(e)))?; + let records = parse_file(&path).map_err(|e| external_datafusion_err!(e))?; let mut errs = vec![]; for record in records.into_iter() { if let Record::Halt { .. } = record { @@ -270,7 +271,7 @@ async fn run_file_in_runner>( } msg.push_str(&format!("{}. {err}\n\n", i + 1)); } - return Err(DataFusionError::External(msg.into())); + return external_err!(msg); } Ok(()) diff --git a/datafusion/sqllogictest/regenerate/sqllogictests.rs b/datafusion/sqllogictest/regenerate/sqllogictests.rs index edad16bc84b1..1fab3ac539b5 100644 --- a/datafusion/sqllogictest/regenerate/sqllogictests.rs +++ b/datafusion/sqllogictest/regenerate/sqllogictests.rs @@ -17,9 +17,11 @@ use clap::Parser; use datafusion::common::instant::Instant; -use datafusion::common::utils::get_available_parallelism; -use datafusion::common::{exec_datafusion_err, exec_err, DataFusionError, Result}; use datafusion::common::runtime::SpawnedTask; +use datafusion::common::utils::get_available_parallelism; +use datafusion::common::{ + exec_datafusion_err, exec_err, external_datafusion_err, DataFusionError, Result, +}; use datafusion_sqllogictest::{DataFusion, TestContext}; use futures::stream::StreamExt; use indicatif::{ @@ -169,8 +171,8 @@ async fn run_tests() -> Result<()> { let m_style = ProgressStyle::with_template( "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}", ) - .unwrap() - .progress_chars("##-"); + .unwrap() + .progress_chars("##-"); let start = Instant::now(); @@ -204,7 +206,7 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, ) - .await? + .await? } (true, true) => { run_complete_file_with_postgres( @@ -213,12 +215,12 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, ) - .await? + .await? } } Ok(()) as Result<()> }) - .join() + .join() }) // run up to num_cpus streams in parallel .buffer_unordered(get_available_parallelism()) @@ -226,7 +228,7 @@ async fn run_tests() -> Result<()> { // Filter out any Ok() leaving only the DataFusionErrors futures::stream::iter(match result { // Tokio panic error - Err(e) => Some(DataFusionError::External(Box::new(e))), + Err(e) => Some(external_datafusion_err!(e)), Ok(thread_result) => match thread_result { // Test run error Err(e) => Some(e), @@ -291,7 +293,7 @@ async fn run_test_file( let res = runner .run_file_async(path) .await - .map_err(|e| DataFusionError::External(Box::new(e))); + .map_err(|e| external_datafusion_err!(e))?; pb.finish_and_clear(); @@ -307,11 +309,11 @@ fn get_record_count(path: &PathBuf, label: String) -> u64 { Record::Query { conditions, .. } => { if conditions.is_empty() || !conditions.contains(&Condition::SkipIf { - label: label.clone(), - }) + label: label.clone(), + }) || conditions.contains(&Condition::OnlyIf { - label: label.clone(), - }) + label: label.clone(), + }) { count += 1; } @@ -319,11 +321,11 @@ fn get_record_count(path: &PathBuf, label: String) -> u64 { Record::Statement { conditions, .. } => { if conditions.is_empty() || !conditions.contains(&Condition::SkipIf { - label: label.clone(), - }) + label: label.clone(), + }) || conditions.contains(&Condition::OnlyIf { - label: label.clone(), - }) + label: label.clone(), + }) { count += 1; } @@ -364,7 +366,7 @@ async fn run_test_file_with_postgres( runner .run_file_async(path) .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; + .map_err(|e| external_datafusion_err!(e))?; pb.finish_and_clear();