Skip to content

Commit 1a92214

Browse files
authored
Merge pull request JanKaul#269 from JanKaul/fix-datafusion-statistics
fix absent statistics
2 parents 495a566 + 07ab5ec commit 1a92214

File tree

2 files changed

+223
-41
lines changed

2 files changed

+223
-41
lines changed

datafusion_iceberg/src/statistics.rs

Lines changed: 38 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,49 +18,46 @@ pub(crate) fn statistics_from_datafiles(
1818
datafiles
1919
.iter()
2020
.filter(|(_, manifest)| !matches!(manifest.status(), Status::Deleted))
21-
.fold(
21+
.map(|(_, manifest)| {
22+
let column_stats = column_statistics(schema, manifest);
2223
Statistics {
23-
num_rows: Precision::Exact(0),
24-
total_byte_size: Precision::Exact(0),
25-
column_statistics: vec![
26-
ColumnStatistics {
27-
null_count: Precision::Absent,
28-
max_value: Precision::Absent,
29-
min_value: Precision::Absent,
30-
distinct_count: Precision::Absent,
31-
sum_value: Precision::Absent,
32-
};
33-
schema.fields().len()
34-
],
35-
},
36-
|acc, (_, manifest)| {
37-
let column_stats = column_statistics(schema, manifest);
38-
Statistics {
39-
num_rows: acc.num_rows.add(&Precision::Exact(
40-
*manifest.data_file().record_count() as usize,
41-
)),
42-
total_byte_size: acc.total_byte_size.add(&Precision::Exact(
43-
*manifest.data_file().file_size_in_bytes() as usize,
44-
)),
45-
column_statistics: acc
46-
.column_statistics
47-
.into_iter()
48-
.zip(column_stats)
49-
.map(|(acc, x)| {
50-
let new_distinct_count = new_distinct_count(&acc, &x);
24+
num_rows: Precision::Exact(*manifest.data_file().record_count() as usize),
25+
total_byte_size: Precision::Exact(
26+
*manifest.data_file().file_size_in_bytes() as usize
27+
),
28+
column_statistics: column_stats
29+
.into_iter()
30+
.map(|x| ColumnStatistics {
31+
null_count: x.null_count,
32+
max_value: x.max_value,
33+
min_value: x.min_value,
34+
distinct_count: x.distinct_count,
35+
sum_value: x.sum_value,
36+
})
37+
.collect(),
38+
}
39+
})
40+
.reduce(|acc, x| Statistics {
41+
num_rows: acc.num_rows.add(&x.num_rows),
42+
total_byte_size: acc.total_byte_size.add(&x.total_byte_size),
43+
column_statistics: acc
44+
.column_statistics
45+
.into_iter()
46+
.zip(x.column_statistics)
47+
.map(|(acc, x)| {
48+
let new_distinct_count = new_distinct_count(&acc, &x);
5149

52-
ColumnStatistics {
53-
null_count: acc.null_count.add(&x.null_count),
54-
max_value: acc.max_value.max(&x.max_value),
55-
min_value: acc.min_value.min(&x.min_value),
56-
distinct_count: new_distinct_count,
57-
sum_value: acc.sum_value.add(&x.sum_value),
58-
}
59-
})
60-
.collect(),
61-
}
62-
},
63-
)
50+
ColumnStatistics {
51+
null_count: acc.null_count.add(&x.null_count),
52+
max_value: acc.max_value.max(&x.max_value),
53+
min_value: acc.min_value.min(&x.min_value),
54+
distinct_count: new_distinct_count,
55+
sum_value: acc.sum_value.add(&x.sum_value),
56+
}
57+
})
58+
.collect(),
59+
})
60+
.unwrap_or_default()
6461
}
6562

6663
fn column_statistics<'a>(
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
use std::sync::Arc;
2+
3+
use datafusion::{
4+
common::{
5+
stats::Precision,
6+
tree_node::{TransformedResult, TreeNode},
7+
ScalarValue,
8+
},
9+
execution::{context::SessionContext, SessionStateBuilder},
10+
};
11+
use datafusion_expr::ScalarUDF;
12+
use iceberg_rust::object_store::ObjectStoreBuilder;
13+
use iceberg_sql_catalog::SqlCatalogList;
14+
15+
use datafusion_iceberg::{
16+
catalog::catalog_list::IcebergCatalogList,
17+
planner::{iceberg_transform, IcebergQueryPlanner, RefreshMaterializedView},
18+
};
19+
20+
#[tokio::test]
21+
async fn test_table_statistics() {
22+
let object_store = ObjectStoreBuilder::memory();
23+
let iceberg_catalog_list = Arc::new(
24+
SqlCatalogList::new("sqlite://", object_store)
25+
.await
26+
.unwrap(),
27+
);
28+
29+
let catalog_list = {
30+
Arc::new(
31+
IcebergCatalogList::new(iceberg_catalog_list.clone())
32+
.await
33+
.unwrap(),
34+
)
35+
};
36+
37+
let state = SessionStateBuilder::new()
38+
.with_default_features()
39+
.with_catalog_list(catalog_list)
40+
.with_query_planner(Arc::new(IcebergQueryPlanner::new()))
41+
.build();
42+
43+
let ctx = SessionContext::new_with_state(state);
44+
45+
ctx.register_udf(ScalarUDF::from(RefreshMaterializedView::new(
46+
iceberg_catalog_list,
47+
)));
48+
49+
let sql = "CREATE EXTERNAL TABLE lineitem (
50+
L_ORDERKEY BIGINT NOT NULL,
51+
L_PARTKEY BIGINT NOT NULL,
52+
L_SUPPKEY BIGINT NOT NULL,
53+
L_LINENUMBER INT NOT NULL,
54+
L_QUANTITY DOUBLE NOT NULL,
55+
L_EXTENDED_PRICE DOUBLE NOT NULL,
56+
L_DISCOUNT DOUBLE NOT NULL,
57+
L_TAX DOUBLE NOT NULL,
58+
L_RETURNFLAG CHAR NOT NULL,
59+
L_LINESTATUS CHAR NOT NULL,
60+
L_SHIPDATE DATE NOT NULL,
61+
L_COMMITDATE DATE NOT NULL,
62+
L_RECEIPTDATE DATE NOT NULL,
63+
L_SHIPINSTRUCT VARCHAR NOT NULL,
64+
L_SHIPMODE VARCHAR NOT NULL,
65+
L_COMMENT VARCHAR NOT NULL ) STORED AS CSV LOCATION 'testdata/tpch/lineitem.csv' OPTIONS ('has_header' 'false');";
66+
67+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
68+
69+
let transformed = plan.transform(iceberg_transform).data().unwrap();
70+
71+
ctx.execute_logical_plan(transformed)
72+
.await
73+
.unwrap()
74+
.collect()
75+
.await
76+
.expect("Failed to execute query plan.");
77+
78+
let sql = "CREATE SCHEMA warehouse.tpch;";
79+
80+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
81+
82+
let transformed = plan.transform(iceberg_transform).data().unwrap();
83+
84+
ctx.execute_logical_plan(transformed)
85+
.await
86+
.unwrap()
87+
.collect()
88+
.await
89+
.expect("Failed to execute query plan.");
90+
91+
let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem (
92+
L_ORDERKEY BIGINT NOT NULL,
93+
L_PARTKEY BIGINT NOT NULL,
94+
L_SUPPKEY BIGINT NOT NULL,
95+
L_LINENUMBER INT NOT NULL,
96+
L_QUANTITY DOUBLE NOT NULL,
97+
L_EXTENDED_PRICE DOUBLE NOT NULL,
98+
L_DISCOUNT DOUBLE NOT NULL,
99+
L_TAX DOUBLE NOT NULL,
100+
L_RETURNFLAG CHAR NOT NULL,
101+
L_LINESTATUS CHAR NOT NULL,
102+
L_SHIPDATE DATE NOT NULL,
103+
L_COMMITDATE DATE NOT NULL,
104+
L_RECEIPTDATE DATE NOT NULL,
105+
L_SHIPINSTRUCT VARCHAR NOT NULL,
106+
L_SHIPMODE VARCHAR NOT NULL,
107+
L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION '/warehouse/tpch/lineitem' PARTITIONED BY ( \"month(L_SHIPDATE)\" );";
108+
109+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
110+
111+
let transformed = plan.transform(iceberg_transform).data().unwrap();
112+
113+
ctx.execute_logical_plan(transformed)
114+
.await
115+
.unwrap()
116+
.collect()
117+
.await
118+
.expect("Failed to execute query plan.");
119+
120+
let sql = "insert into warehouse.tpch.lineitem select * from lineitem;";
121+
122+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
123+
124+
let transformed = plan.transform(iceberg_transform).data().unwrap();
125+
126+
ctx.execute_logical_plan(transformed)
127+
.await
128+
.unwrap()
129+
.collect()
130+
.await
131+
.expect("Failed to execute query plan.");
132+
133+
let sql = ctx
134+
.sql("select sum(L_QUANTITY), L_PARTKEY from warehouse.tpch.lineitem group by L_PARTKEY;")
135+
.await
136+
.expect("Failed to create plan for select");
137+
138+
let physical_plan = sql.create_physical_plan().await.unwrap();
139+
140+
let stats = physical_plan.partition_statistics(None).unwrap();
141+
142+
// Validate table-level statistics
143+
assert_eq!(
144+
stats.num_rows,
145+
Precision::Inexact(47048),
146+
"num_rows should match the total rows in the CSV file"
147+
);
148+
assert!(
149+
matches!(stats.total_byte_size, Precision::Inexact(size) if size > 0),
150+
"total_byte_size should be Inexact and greater than 0"
151+
);
152+
153+
// Validate column count (sum(L_QUANTITY) and L_PARTKEY)
154+
assert_eq!(
155+
stats.column_statistics.len(),
156+
2,
157+
"Should have statistics for 2 columns"
158+
);
159+
160+
// Validate first column (sum(L_QUANTITY)) - aggregate column should have Absent statistics
161+
assert!(
162+
matches!(stats.column_statistics[0].min_value, Precision::Absent),
163+
"Aggregate column min_value should be Absent"
164+
);
165+
assert!(
166+
matches!(stats.column_statistics[0].max_value, Precision::Absent),
167+
"Aggregate column max_value should be Absent"
168+
);
169+
assert!(
170+
matches!(stats.column_statistics[0].null_count, Precision::Absent),
171+
"Aggregate column null_count should be Absent"
172+
);
173+
174+
// Validate second column (L_PARTKEY) - should have min/max bounds from Iceberg metadata
175+
assert_eq!(
176+
stats.column_statistics[1].min_value,
177+
Precision::Exact(ScalarValue::Int64(Some(2))),
178+
"L_PARTKEY min_value should be 2"
179+
);
180+
assert_eq!(
181+
stats.column_statistics[1].max_value,
182+
Precision::Exact(ScalarValue::Int64(Some(200000))),
183+
"L_PARTKEY max_value should be 200000"
184+
);
185+
}

0 commit comments

Comments
 (0)