Skip to content

Commit 495a566

Browse files
authored
Merge pull request JanKaul#267 from JanKaul/datafusion-statistics
add Datafusion statistics
2 parents aa087b5 + dd4ec7c commit 495a566

File tree

1 file changed

+75
-26
lines changed

1 file changed

+75
-26
lines changed

datafusion_iceberg/src/table.rs

Lines changed: 75 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,10 @@ use std::{
2727
any::Any,
2828
collections::{HashMap, HashSet},
2929
fmt,
30-
ops::{Deref, DerefMut},
31-
sync::Arc,
32-
};
33-
use tokio::sync::{
34-
mpsc::{self},
35-
RwLock, RwLockWriteGuard,
30+
ops::Deref,
31+
sync::{Arc, RwLock},
3632
};
33+
use tokio::sync::mpsc::{self};
3734
use tracing::{instrument, Instrument};
3835

3936
use crate::statistics::statistics_from_datafiles;
@@ -42,7 +39,7 @@ use crate::{
4239
pruning_statistics::{transform_predicate, PruneDataFiles, PruneManifests},
4340
statistics::manifest_statistics,
4441
};
45-
use datafusion::common::NullEquality;
42+
use datafusion::common::{NullEquality, Statistics};
4643
use datafusion::physical_plan::empty::EmptyExec;
4744
use datafusion::{
4845
arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaBuilder, SchemaRef},
@@ -213,8 +210,8 @@ impl DataFusionTable {
213210
Self::new(Tabular::Table(table), start, end, branch)
214211
}
215212

216-
pub async fn inner_mut(&self) -> RwLockWriteGuard<'_, Tabular> {
217-
self.tabular.write().await
213+
pub fn inner_mut(&self) -> std::sync::RwLockWriteGuard<'_, Tabular> {
214+
self.tabular.write().unwrap()
218215
}
219216
}
220217

@@ -237,7 +234,9 @@ impl TableProvider for DataFusionTable {
237234
limit: Option<usize>,
238235
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
239236
let session_state = session.as_any().downcast_ref::<SessionState>().unwrap();
240-
match self.tabular.read().await.deref() {
237+
// Clone the tabular to avoid holding the lock across await points
238+
let tabular = self.tabular.read().unwrap().clone();
239+
match tabular {
241240
Tabular::View(view) => {
242241
let metadata = view.metadata();
243242
let version = self
@@ -261,7 +260,7 @@ impl TableProvider for DataFusionTable {
261260
Tabular::Table(table) => {
262261
let schema = self.schema();
263262
table_scan(
264-
table,
263+
&table,
265264
&self.snapshot_range,
266265
schema,
267266
self.config.as_ref(),
@@ -321,6 +320,40 @@ impl TableProvider for DataFusionTable {
321320
.map(|_| TableProviderFilterPushDown::Inexact)
322321
.collect())
323322
}
323+
324+
fn statistics(&self) -> Option<Statistics> {
325+
use datafusion::common::stats::Precision;
326+
327+
// Clone the tabular to avoid holding the lock
328+
let tabular = self.tabular.read().unwrap().clone();
329+
330+
match tabular {
331+
Tabular::Table(table) => {
332+
// Get the current snapshot
333+
let snapshot = table
334+
.metadata()
335+
.current_snapshot(self.branch.as_deref())
336+
.ok()
337+
.flatten()?;
338+
339+
// Extract total-records from the snapshot summary
340+
let num_rows = snapshot
341+
.summary()
342+
.other
343+
.get("total-records")
344+
.and_then(|s| s.parse::<usize>().ok())
345+
.map(Precision::Inexact)
346+
.unwrap_or(Precision::Absent);
347+
348+
Some(datafusion::physical_plan::Statistics {
349+
num_rows,
350+
total_byte_size: Precision::Absent,
351+
column_statistics: vec![],
352+
})
353+
}
354+
Tabular::View(_) | Tabular::MaterializedView(_) => None,
355+
}
356+
}
324357
}
325358

326359
// Create a fake object store URL. Different table paths should produce fake URLs
@@ -959,16 +992,19 @@ impl DataSink for IcebergDataSink {
959992
data: SendableRecordBatchStream,
960993
context: &Arc<TaskContext>,
961994
) -> Result<u64, DataFusionError> {
962-
let mut lock = self.0.tabular.write().await;
963-
let table = if let Tabular::Table(table) = lock.deref_mut() {
964-
Ok(table)
965-
} else {
966-
Err(Error::InvalidFormat("database entity".to_string()))
967-
}
968-
.map_err(DataFusionIcebergError::from)?;
995+
// Clone the table from the read lock
996+
let mut table = {
997+
let lock = self.0.tabular.read().unwrap();
998+
if let Tabular::Table(table) = lock.deref() {
999+
Ok(table.clone())
1000+
} else {
1001+
Err(Error::InvalidFormat("database entity".to_string()))
1002+
}
1003+
.map_err(DataFusionIcebergError::from)?
1004+
};
9691005

9701006
let metadata_files =
971-
write_parquet_data_files(table, data, context, self.0.branch.as_deref()).await?;
1007+
write_parquet_data_files(&table, data, context, self.0.branch.as_deref()).await?;
9721008

9731009
table
9741010
.new_transaction(self.0.branch.as_deref())
@@ -977,6 +1013,10 @@ impl DataSink for IcebergDataSink {
9771013
.await
9781014
.map_err(DataFusionIcebergError::from)?;
9791015

1016+
// Acquire write lock and overwrite the old table with the new one
1017+
let mut lock = self.0.tabular.write().unwrap();
1018+
*lock = Tabular::Table(table);
1019+
9801020
Ok(0)
9811021
}
9821022
fn metrics(&self) -> Option<MetricsSet> {
@@ -1405,7 +1445,7 @@ mod tests {
14051445
};
14061446
use iceberg_sql_catalog::SqlCatalog;
14071447

1408-
use std::{ops::Deref, sync::Arc};
1448+
use std::sync::Arc;
14091449

14101450
use crate::{catalog::catalog::IcebergCatalog, table::fake_object_store_url, DataFusionTable};
14111451

@@ -1630,9 +1670,12 @@ mod tests {
16301670
}
16311671
}
16321672

1633-
if let Tabular::Table(table) = table.tabular.read().await.deref() {
1634-
assert_eq!(table.manifests(None, None).await.unwrap().len(), 2);
1673+
let table = if let Tabular::Table(table) = table.tabular.read().unwrap().clone() {
1674+
table
1675+
} else {
1676+
panic!()
16351677
};
1678+
assert_eq!(table.manifests(None, None).await.unwrap().len(), 2);
16361679
}
16371680

16381681
#[tokio::test]
@@ -1816,9 +1859,12 @@ mod tests {
18161859
}
18171860
}
18181861

1819-
if let Tabular::Table(table) = table.tabular.read().await.deref() {
1820-
assert_eq!(table.manifests(None, None).await.unwrap().len(), 2);
1862+
let table = if let Tabular::Table(table) = table.tabular.read().unwrap().clone() {
1863+
table
1864+
} else {
1865+
panic!();
18211866
};
1867+
assert_eq!(table.manifests(None, None).await.unwrap().len(), 2);
18221868
}
18231869

18241870
#[tokio::test]
@@ -2007,9 +2053,12 @@ mod tests {
20072053
}
20082054
}
20092055

2010-
if let Tabular::Table(table) = table.tabular.read().await.deref() {
2011-
assert_eq!(table.manifests(None, None).await.unwrap().len(), 2);
2056+
let table = if let Tabular::Table(table) = table.tabular.read().unwrap().clone() {
2057+
table
2058+
} else {
2059+
panic!();
20122060
};
2061+
assert_eq!(table.manifests(None, None).await.unwrap().len(), 2);
20132062
}
20142063

20152064
#[tokio::test]

0 commit comments

Comments
 (0)