Skip to content

Commit a42207c

Browse files
authored
Merge pull request JanKaul#260 from JanKaul/detailed-tracing
Make sure to load Manifests concurrently
2 parents 7d33b89 + 7fc30e6 commit a42207c

File tree

5 files changed

+99
-134
lines changed

5 files changed

+99
-134
lines changed

datafusion_iceberg/src/statistics.rs

Lines changed: 14 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,23 @@
1-
use std::ops::Deref;
2-
31
use datafusion::{
42
common::stats::Precision,
53
physical_plan::{ColumnStatistics, Statistics},
64
scalar::ScalarValue,
75
};
8-
use futures::{future, TryFutureExt, TryStreamExt};
6+
use iceberg_rust::error::Error;
97
use iceberg_rust::spec::{
108
manifest::{ManifestEntry, Status},
119
schema::Schema,
1210
values::Value,
1311
};
14-
use iceberg_rust::{catalog::tabular::Tabular, error::Error, table::Table};
15-
use itertools::Itertools;
16-
17-
use super::table::DataFusionTable;
18-
19-
impl DataFusionTable {
20-
pub(crate) async fn statistics(&self) -> Result<Statistics, Error> {
21-
match self.tabular.read().await.deref() {
22-
Tabular::Table(table) => table_statistics(table, &self.snapshot_range).await,
23-
Tabular::View(_) => Err(Error::NotSupported("Statistics for views".to_string())),
24-
Tabular::MaterializedView(mv) => {
25-
let table = mv.storage_table().await?;
26-
table_statistics(&table, &self.snapshot_range).await
27-
}
28-
}
29-
}
30-
}
3112

32-
pub(crate) async fn table_statistics(
33-
table: &Table,
34-
snapshot_range: &(Option<i64>, Option<i64>),
35-
) -> Result<Statistics, Error> {
36-
let schema = &snapshot_range
37-
.1
38-
.and_then(|snapshot_id| table.metadata().schema(snapshot_id).ok().cloned())
39-
.unwrap_or_else(|| table.current_schema(None).unwrap().clone());
40-
41-
let sequence_number_range = [snapshot_range.0, snapshot_range.1]
42-
.iter()
43-
.map(|x| x.and_then(|y| table.metadata().sequence_number(y)))
44-
.collect_tuple::<(Option<i64>, Option<i64>)>()
45-
.unwrap();
46-
47-
let manifests = table.manifests(snapshot_range.0, snapshot_range.1).await?;
48-
let datafiles = table
49-
.datafiles(&manifests, None, sequence_number_range)
50-
.await?;
13+
pub(crate) fn statistics_from_datafiles(
14+
schema: &Schema,
15+
datafiles: &[(String, ManifestEntry)],
16+
) -> Statistics {
5117
datafiles
52-
.try_filter(|manifest| future::ready(!matches!(manifest.1.status(), Status::Deleted)))
53-
.try_fold(
18+
.iter()
19+
.filter(|(_, manifest)| !matches!(manifest.status(), Status::Deleted))
20+
.fold(
5421
Statistics {
5522
num_rows: Precision::Exact(0),
5623
total_byte_size: Precision::Exact(0),
@@ -65,14 +32,14 @@ pub(crate) async fn table_statistics(
6532
schema.fields().len()
6633
],
6734
},
68-
|acc, manifest| async move {
69-
let column_stats = column_statistics(schema, &manifest.1);
70-
Ok(Statistics {
35+
|acc, (_, manifest)| {
36+
let column_stats = column_statistics(schema, manifest);
37+
Statistics {
7138
num_rows: acc.num_rows.add(&Precision::Exact(
72-
*manifest.1.data_file().record_count() as usize,
39+
*manifest.data_file().record_count() as usize,
7340
)),
7441
total_byte_size: acc.total_byte_size.add(&Precision::Exact(
75-
*manifest.1.data_file().file_size_in_bytes() as usize,
42+
*manifest.data_file().file_size_in_bytes() as usize,
7643
)),
7744
column_statistics: acc
7845
.column_statistics
@@ -86,11 +53,9 @@ pub(crate) async fn table_statistics(
8653
sum_value: acc.sum_value.add(&x.sum_value),
8754
})
8855
.collect(),
89-
})
56+
}
9057
},
9158
)
92-
.map_err(Error::from)
93-
.await
9459
}
9560

9661
fn column_statistics<'a>(

datafusion_iceberg/src/table.rs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ use tokio::sync::{
3333
mpsc::{self},
3434
RwLock, RwLockWriteGuard,
3535
};
36-
use tracing::instrument;
36+
use tracing::{instrument, Instrument};
3737

38+
use crate::statistics::statistics_from_datafiles;
3839
use crate::{
3940
error::Error as DataFusionIcebergError,
4041
pruning_statistics::{transform_predicate, PruneDataFiles, PruneManifests},
@@ -73,7 +74,6 @@ use datafusion::{
7374
projection::ProjectionExec,
7475
union::UnionExec,
7576
DisplayAs, DisplayFormatType, ExecutionPlan, PhysicalExpr, SendableRecordBatchStream,
76-
Statistics,
7777
},
7878
prelude::Expr,
7979
scalar::ScalarValue,
@@ -259,16 +259,11 @@ impl TableProvider for DataFusionTable {
259259
}
260260
Tabular::Table(table) => {
261261
let schema = self.schema();
262-
let statistics = self
263-
.statistics()
264-
.await
265-
.map_err(DataFusionIcebergError::from)?;
266262
table_scan(
267263
table,
268264
&self.snapshot_range,
269265
schema,
270266
self.config.as_ref(),
271-
statistics,
272267
session_state,
273268
projection,
274269
filters,
@@ -282,16 +277,11 @@ impl TableProvider for DataFusionTable {
282277
.await
283278
.map_err(DataFusionIcebergError::from)?;
284279
let schema = self.schema();
285-
let statistics = self
286-
.statistics()
287-
.await
288-
.map_err(DataFusionIcebergError::from)?;
289280
table_scan(
290281
&table,
291282
&self.snapshot_range,
292283
schema,
293284
self.config.as_ref(),
294-
statistics,
295285
session_state,
296286
projection,
297287
filters,
@@ -350,7 +340,7 @@ fn fake_object_store_url(table_location_url: &str) -> ObjectStoreUrl {
350340
}
351341

352342
#[allow(clippy::too_many_arguments)]
353-
#[instrument(name = "datafusion_iceberg::table_scan", level = "debug", skip(arrow_schema, statistics, session, filters), fields(
343+
#[instrument(name = "datafusion_iceberg::table_scan", level = "debug", skip(arrow_schema, session, filters), fields(
354344
table_identifier = %table.identifier(),
355345
snapshot_range = ?snapshot_range,
356346
projection = ?projection,
@@ -362,7 +352,6 @@ async fn table_scan(
362352
snapshot_range: &(Option<i64>, Option<i64>),
363353
arrow_schema: SchemaRef,
364354
config: Option<&DataFusionTableConfig>,
365-
statistics: Statistics,
366355
session: &SessionState,
367356
projection: Option<&Vec<usize>>,
368357
filters: &[Expr],
@@ -445,7 +434,7 @@ async fn table_scan(
445434
HashMap::new();
446435

447436
// Prune data & delete file and insert them into the according map
448-
if let Some(physical_predicate) = physical_predicate.clone() {
437+
let statistics = if let Some(physical_predicate) = physical_predicate.clone() {
449438
let partition_schema = Arc::new(ArrowSchema::new(table_partition_cols.clone()));
450439
let partition_column_names = partition_fields
451440
.iter()
@@ -492,15 +481,13 @@ async fn table_scan(
492481
.await
493482
.map_err(DataFusionIcebergError::from)?
494483
.try_collect()
495-
.await
496484
.map_err(DataFusionIcebergError::from)?
497485
} else {
498486
table
499487
.datafiles(&manifests, None, sequence_number_range)
500488
.await
501489
.map_err(DataFusionIcebergError::from)?
502490
.try_collect()
503-
.await
504491
.map_err(DataFusionIcebergError::from)?
505492
};
506493

@@ -513,6 +500,8 @@ async fn table_scan(
513500
&data_files,
514501
))?;
515502

503+
let statistics = statistics_from_datafiles(&schema, &data_files);
504+
516505
data_files
517506
.into_iter()
518507
.zip(files_to_prune.into_iter())
@@ -537,18 +526,21 @@ async fn table_scan(
537526
}
538527
};
539528
});
529+
statistics
540530
} else {
541531
let manifests = table
542532
.manifests(snapshot_range.0, snapshot_range.1)
543533
.await
544534
.map_err(DataFusionIcebergError::from)?;
545-
let data_files: Vec<(ManifestPath, ManifestEntry)> = table
535+
let data_files: Vec<_> = table
546536
.datafiles(&manifests, None, sequence_number_range)
547537
.await
548538
.map_err(DataFusionIcebergError::from)?
549539
.try_collect()
550-
.await
551540
.map_err(DataFusionIcebergError::from)?;
541+
542+
let statistics = statistics_from_datafiles(&schema, &data_files);
543+
552544
data_files.into_iter().for_each(|manifest| {
553545
if *manifest.1.status() != Status::Deleted {
554546
match manifest.1.data_file().content() {
@@ -570,6 +562,7 @@ async fn table_scan(
570562
}
571563
}
572564
});
565+
statistics
573566
};
574567

575568
let file_source = Arc::new(
@@ -865,6 +858,9 @@ async fn table_scan(
865858

866859
let other_plan = ParquetFormat::default()
867860
.create_physical_plan(session, file_scan_config)
861+
.instrument(tracing::debug_span!(
862+
"datafusion_iceberg::create_physical_plan_scan_data_files"
863+
))
868864
.await?;
869865

870866
plans.push(other_plan);

iceberg-rust/src/table/manifest_list.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::{
1313
use apache_avro::{
1414
types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema, Writer as AvroWriter,
1515
};
16-
use futures::{future::join_all, TryFutureExt, TryStreamExt};
16+
use futures::{future::join_all, stream, TryFutureExt, TryStreamExt};
1717
use iceberg_rust_spec::{
1818
manifest::{partition_value_schema, DataFile, ManifestEntry, Status},
1919
manifest_list::{
@@ -255,7 +255,7 @@ pub async fn snapshot_column_bounds(
255255

256256
let primitive_field_ids = schema.primitive_field_ids().collect::<Vec<_>>();
257257
let n = primitive_field_ids.len();
258-
datafiles
258+
stream::iter(datafiles)
259259
.try_fold(None::<Rectangle>, |acc, (_, manifest)| {
260260
let primitive_field_ids = &primitive_field_ids;
261261
async move {

0 commit comments

Comments
 (0)