Skip to content

Commit 44b9855

Browse files
adriangbclaude
andcommitted
feat: serialize ExplainFormat in AnalyzeNode and AnalyzeExecNode proto
Add ExplainFormat to both the logical-plan AnalyzeNode (field 5) and physical-plan AnalyzeExecNode (field 7) so the pgjson format survives proto roundtrips. Wire up encode/decode in both codecs. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 6055348 commit 44b9855

5 files changed

Lines changed: 91 additions & 1 deletion

File tree

datafusion/proto-models/proto/datafusion.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ message AnalyzeNode {
230230
// Statement-level override for `datafusion.explain.analyze_categories`.
231231
// Absent means "fall back to session config".
232232
optional datafusion_common.ExplainAnalyzeCategoriesNode analyze_categories = 4;
233+
datafusion_common.ExplainFormat format = 5;
233234
}
234235

235236
message ExplainNode {
@@ -1243,6 +1244,7 @@ message AnalyzeExecNode {
12431244
// Empty means "plan only". Absent (has_metric_categories=false) means "all".
12441245
bool has_metric_categories = 5;
12451246
repeated string metric_categories = 6;
1247+
datafusion_common.ExplainFormat format = 7;
12461248
}
12471249

12481250
message CrossJoinExecNode {

datafusion/proto-models/src/generated/pbjson.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -999,6 +999,9 @@ impl serde::Serialize for AnalyzeExecNode {
999999
if !self.metric_categories.is_empty() {
10001000
len += 1;
10011001
}
1002+
if self.format != 0 {
1003+
len += 1;
1004+
}
10021005
let mut struct_ser = serializer.serialize_struct("datafusion.AnalyzeExecNode", len)?;
10031006
if self.verbose {
10041007
struct_ser.serialize_field("verbose", &self.verbose)?;
@@ -1018,6 +1021,11 @@ impl serde::Serialize for AnalyzeExecNode {
10181021
if !self.metric_categories.is_empty() {
10191022
struct_ser.serialize_field("metricCategories", &self.metric_categories)?;
10201023
}
1024+
if self.format != 0 {
1025+
let v = super::datafusion_common::ExplainFormat::try_from(self.format)
1026+
.map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.format)))?;
1027+
struct_ser.serialize_field("format", &v)?;
1028+
}
10211029
struct_ser.end()
10221030
}
10231031
}
@@ -1037,6 +1045,7 @@ impl<'de> serde::Deserialize<'de> for AnalyzeExecNode {
10371045
"hasMetricCategories",
10381046
"metric_categories",
10391047
"metricCategories",
1048+
"format",
10401049
];
10411050

10421051
#[allow(clippy::enum_variant_names)]
@@ -1047,6 +1056,7 @@ impl<'de> serde::Deserialize<'de> for AnalyzeExecNode {
10471056
Schema,
10481057
HasMetricCategories,
10491058
MetricCategories,
1059+
Format,
10501060
}
10511061
impl<'de> serde::Deserialize<'de> for GeneratedField {
10521062
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
@@ -1074,6 +1084,7 @@ impl<'de> serde::Deserialize<'de> for AnalyzeExecNode {
10741084
"schema" => Ok(GeneratedField::Schema),
10751085
"hasMetricCategories" | "has_metric_categories" => Ok(GeneratedField::HasMetricCategories),
10761086
"metricCategories" | "metric_categories" => Ok(GeneratedField::MetricCategories),
1087+
"format" => Ok(GeneratedField::Format),
10771088
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
10781089
}
10791090
}
@@ -1099,6 +1110,7 @@ impl<'de> serde::Deserialize<'de> for AnalyzeExecNode {
10991110
let mut schema__ = None;
11001111
let mut has_metric_categories__ = None;
11011112
let mut metric_categories__ = None;
1113+
let mut format__ = None;
11021114
while let Some(k) = map_.next_key()? {
11031115
match k {
11041116
GeneratedField::Verbose => {
@@ -1137,6 +1149,12 @@ impl<'de> serde::Deserialize<'de> for AnalyzeExecNode {
11371149
}
11381150
metric_categories__ = Some(map_.next_value()?);
11391151
}
1152+
GeneratedField::Format => {
1153+
if format__.is_some() {
1154+
return Err(serde::de::Error::duplicate_field("format"));
1155+
}
1156+
format__ = Some(map_.next_value::<super::datafusion_common::ExplainFormat>()? as i32);
1157+
}
11401158
}
11411159
}
11421160
Ok(AnalyzeExecNode {
@@ -1146,6 +1164,7 @@ impl<'de> serde::Deserialize<'de> for AnalyzeExecNode {
11461164
schema: schema__,
11471165
has_metric_categories: has_metric_categories__.unwrap_or_default(),
11481166
metric_categories: metric_categories__.unwrap_or_default(),
1167+
format: format__.unwrap_or_default(),
11491168
})
11501169
}
11511170
}
@@ -1172,6 +1191,9 @@ impl serde::Serialize for AnalyzeNode {
11721191
if self.analyze_categories.is_some() {
11731192
len += 1;
11741193
}
1194+
if self.format != 0 {
1195+
len += 1;
1196+
}
11751197
let mut struct_ser = serializer.serialize_struct("datafusion.AnalyzeNode", len)?;
11761198
if let Some(v) = self.input.as_ref() {
11771199
struct_ser.serialize_field("input", v)?;
@@ -1187,6 +1209,11 @@ impl serde::Serialize for AnalyzeNode {
11871209
if let Some(v) = self.analyze_categories.as_ref() {
11881210
struct_ser.serialize_field("analyzeCategories", v)?;
11891211
}
1212+
if self.format != 0 {
1213+
let v = super::datafusion_common::ExplainFormat::try_from(self.format)
1214+
.map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.format)))?;
1215+
struct_ser.serialize_field("format", &v)?;
1216+
}
11901217
struct_ser.end()
11911218
}
11921219
}
@@ -1203,6 +1230,7 @@ impl<'de> serde::Deserialize<'de> for AnalyzeNode {
12031230
"analyzeLevel",
12041231
"analyze_categories",
12051232
"analyzeCategories",
1233+
"format",
12061234
];
12071235

12081236
#[allow(clippy::enum_variant_names)]
@@ -1211,6 +1239,7 @@ impl<'de> serde::Deserialize<'de> for AnalyzeNode {
12111239
Verbose,
12121240
AnalyzeLevel,
12131241
AnalyzeCategories,
1242+
Format,
12141243
}
12151244
impl<'de> serde::Deserialize<'de> for GeneratedField {
12161245
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
@@ -1236,6 +1265,7 @@ impl<'de> serde::Deserialize<'de> for AnalyzeNode {
12361265
"verbose" => Ok(GeneratedField::Verbose),
12371266
"analyzeLevel" | "analyze_level" => Ok(GeneratedField::AnalyzeLevel),
12381267
"analyzeCategories" | "analyze_categories" => Ok(GeneratedField::AnalyzeCategories),
1268+
"format" => Ok(GeneratedField::Format),
12391269
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
12401270
}
12411271
}
@@ -1259,6 +1289,7 @@ impl<'de> serde::Deserialize<'de> for AnalyzeNode {
12591289
let mut verbose__ = None;
12601290
let mut analyze_level__ = None;
12611291
let mut analyze_categories__ = None;
1292+
let mut format__ = None;
12621293
while let Some(k) = map_.next_key()? {
12631294
match k {
12641295
GeneratedField::Input => {
@@ -1285,13 +1316,20 @@ impl<'de> serde::Deserialize<'de> for AnalyzeNode {
12851316
}
12861317
analyze_categories__ = map_.next_value()?;
12871318
}
1319+
GeneratedField::Format => {
1320+
if format__.is_some() {
1321+
return Err(serde::de::Error::duplicate_field("format"));
1322+
}
1323+
format__ = Some(map_.next_value::<super::datafusion_common::ExplainFormat>()? as i32);
1324+
}
12881325
}
12891326
}
12901327
Ok(AnalyzeNode {
12911328
input: input__,
12921329
verbose: verbose__.unwrap_or_default(),
12931330
analyze_level: analyze_level__,
12941331
analyze_categories: analyze_categories__,
1332+
format: format__.unwrap_or_default(),
12951333
})
12961334
}
12971335
}

datafusion/proto-models/src/generated/prost.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,8 @@ pub struct AnalyzeNode {
354354
pub analyze_categories: ::core::option::Option<
355355
super::datafusion_common::ExplainAnalyzeCategoriesNode,
356356
>,
357+
#[prost(enumeration = "super::datafusion_common::ExplainFormat", tag = "5")]
358+
pub format: i32,
357359
}
358360
#[derive(Clone, PartialEq, ::prost::Message)]
359361
pub struct ExplainNode {
@@ -1852,6 +1854,8 @@ pub struct AnalyzeExecNode {
18521854
pub has_metric_categories: bool,
18531855
#[prost(string, repeated, tag = "6")]
18541856
pub metric_categories: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
1857+
#[prost(enumeration = "super::datafusion_common::ExplainFormat", tag = "7")]
1858+
pub format: i32,
18551859
}
18561860
#[derive(Clone, PartialEq, ::prost::Message)]
18571861
pub struct CrossJoinExecNode {

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -874,12 +874,26 @@ impl AsLogicalPlan for LogicalPlanNode {
874874
.as_ref()
875875
.map(explain_analyze_categories_from_proto)
876876
.transpose()?;
877+
let pb_format = protobuf::ExplainFormat::try_from(analyze.format)
878+
.map_err(|_| {
879+
proto_error(format!(
880+
"Received an AnalyzeNode message with unknown ExplainFormat {}",
881+
analyze.format
882+
))
883+
})?;
884+
let analyze_format = match pb_format {
885+
protobuf::ExplainFormat::Indent => ExplainFormat::Indent,
886+
protobuf::ExplainFormat::Tree => ExplainFormat::Tree,
887+
protobuf::ExplainFormat::Pgjson => ExplainFormat::PostgresJSON,
888+
protobuf::ExplainFormat::Graphviz => ExplainFormat::Graphviz,
889+
};
877890
let explain_option =
878891
datafusion_expr::logical_plan::ExplainOption::default()
879892
.with_verbose(analyze.verbose)
880893
.with_analyze(true)
881894
.with_analyze_level(analyze_level)
882-
.with_analyze_categories(analyze_categories);
895+
.with_analyze_categories(analyze_categories)
896+
.with_format(analyze_format);
883897
LogicalPlanBuilder::from(input)
884898
.explain_option_format(explain_option)?
885899
.build()
@@ -1878,6 +1892,16 @@ impl AsLogicalPlan for LogicalPlanNode {
18781892
.analyze_categories
18791893
.as_ref()
18801894
.map(explain_analyze_categories_to_proto),
1895+
format: match &a.format {
1896+
ExplainFormat::Indent => protobuf::ExplainFormat::Indent,
1897+
ExplainFormat::Tree => protobuf::ExplainFormat::Tree,
1898+
ExplainFormat::PostgresJSON => {
1899+
protobuf::ExplainFormat::Pgjson
1900+
}
1901+
ExplainFormat::Graphviz => {
1902+
protobuf::ExplainFormat::Graphviz
1903+
}
1904+
} as i32,
18811905
},
18821906
))),
18831907
})

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use arrow::datatypes::{IntervalMonthDayNanoType, Schema, SchemaRef};
2626
use datafusion_catalog::memory::MemorySourceConfig;
2727
use datafusion_common::config::CsvOptions;
2828
use datafusion_common::display::StringifiedPlan;
29+
use datafusion_common::format::ExplainFormat;
2930
use datafusion_common::{
3031
DataFusionError, JoinType, NullEquality, Result, internal_datafusion_err,
3132
internal_err, not_impl_err,
@@ -1962,6 +1963,19 @@ pub trait PhysicalPlanNodeExt: Sized {
19621963
} else {
19631964
None
19641965
};
1966+
let pb_format =
1967+
protobuf::ExplainFormat::try_from(analyze.format).map_err(|_| {
1968+
DataFusionError::Internal(format!(
1969+
"Received an AnalyzeExecNode message with unknown ExplainFormat {}",
1970+
analyze.format
1971+
))
1972+
})?;
1973+
let format = match pb_format {
1974+
protobuf::ExplainFormat::Indent => ExplainFormat::Indent,
1975+
protobuf::ExplainFormat::Tree => ExplainFormat::Tree,
1976+
protobuf::ExplainFormat::Pgjson => ExplainFormat::PostgresJSON,
1977+
protobuf::ExplainFormat::Graphviz => ExplainFormat::Graphviz,
1978+
};
19651979
Ok(Arc::new(
19661980
AnalyzeExec::builder(
19671981
analyze.verbose,
@@ -1970,6 +1984,7 @@ pub trait PhysicalPlanNodeExt: Sized {
19701984
Arc::new(convert_required!(analyze.schema)?),
19711985
)
19721986
.with_metric_categories(metric_categories)
1987+
.with_format(format)
19731988
.build(),
19741989
))
19751990
}
@@ -2465,6 +2480,12 @@ pub trait PhysicalPlanNodeExt: Sized {
24652480
Some(cats) => (true, cats.iter().map(|c| c.to_string()).collect()),
24662481
None => (false, vec![]),
24672482
};
2483+
let format = match exec.format() {
2484+
ExplainFormat::Indent => protobuf::ExplainFormat::Indent,
2485+
ExplainFormat::Tree => protobuf::ExplainFormat::Tree,
2486+
ExplainFormat::PostgresJSON => protobuf::ExplainFormat::Pgjson,
2487+
ExplainFormat::Graphviz => protobuf::ExplainFormat::Graphviz,
2488+
} as i32;
24682489
Ok(protobuf::PhysicalPlanNode {
24692490
physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
24702491
protobuf::AnalyzeExecNode {
@@ -2474,6 +2495,7 @@ pub trait PhysicalPlanNodeExt: Sized {
24742495
schema: Some(exec.schema().as_ref().try_into()?),
24752496
has_metric_categories,
24762497
metric_categories,
2498+
format,
24772499
},
24782500
))),
24792501
})

0 commit comments

Comments
 (0)