diff --git a/Cargo.lock b/Cargo.lock index a356bb387f..a6c1b3610c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -593,6 +593,28 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "async-task" version = "4.7.1" @@ -908,7 +930,7 @@ dependencies = [ "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", - "tower", + "tower 0.5.2", "tracing", ] @@ -1534,7 +1556,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -3440,6 +3462,19 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.6.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.10" @@ -3499,6 +3534,7 @@ dependencies = [ "arrow-string", "as-any", "async-std", + "async-stream", "async-trait", "base64 0.22.1", "bimap", @@ -3511,6 +3547,7 @@ dependencies = [ "futures", "iceberg_test_utils", "itertools 0.13.0", + "metrics", "moka", "murmur3", "num-bigint", @@ -3534,6 +3571,7 @@ dependencies = [ "tera", "thrift", "tokio", + "tracing", "typed-builder 0.20.1", "url", "uuid", @@ -3704,6 +3742,8 @@ dependencies = [ "iceberg-catalog-rest", "iceberg-datafusion", "iceberg_test_utils", + "init-tracing-opentelemetry", + "metrics-exporter-prometheus", "ordered-float 2.10.1", "parquet", "tokio", @@ -3948,6 +3988,22 @@ dependencies = [ "web-time", ] +[[package]] +name = "init-tracing-opentelemetry" +version = "0.28.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "599322bed1184942482c1b98019302a4b186f92b82a06c954951261bcb04a1d4" +dependencies = [ + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "thiserror 2.0.12", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", +] + [[package]] name = "inout" version = "0.1.3" @@ -4202,7 +4258,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -4397,6 +4453,53 @@ dependencies = [ "tokio", ] +[[package]] +name = "metrics" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25dea7ac8057892855ec285c440160265225438c3c45072613c25a4b26e98ef5" +dependencies = [ + "ahash 0.8.11", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b166dea96003ee2531cf14833efedced545751d800f03535801d833313f8c15" +dependencies = [ + "base64 0.22.1", + "http-body-util", + "hyper 1.6.0", + "hyper-rustls 0.27.5", + "hyper-util", + "indexmap 2.9.0", + "ipnet", + "metrics", + "metrics-util", + "quanta", + "thiserror 2.0.12", + "tokio", + "tracing", +] + +[[package]] +name = "metrics-util" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe8db7a05415d0f919ffb905afa37784f71901c9a773188876984b4f769ab986" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.15.2", + "metrics", + "quanta", + "rand 0.9.0", + "rand_xoshiro", + "sketches-ddsketch", +] + [[package]] name = "mimalloc" version = "0.1.46" @@ -4801,6 +4904,89 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "opentelemetry" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e87237e2775f74896f9ad219d26a2081751187eb7c9f5c58dde20a23b95d16c" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.12", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46d7ab32b827b5b495bd90fa95a6cb65ccc293555dcc3199ae2937d2d237c8ed" +dependencies = [ + "async-trait", + "bytes", + "http 1.3.1", + "opentelemetry", + "reqwest", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d899720fe06916ccba71c01d04ecd77312734e2de3467fd30d9d580c8ce85656" +dependencies = [ + "futures-core", + "http 1.3.1", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "thiserror 2.0.12", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c40da242381435e18570d5b9d50aca2a4f4f4d8e146231adb4e7768023309b3" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84b29a9f89f1a954936d5aa92f19b2feec3c8f3971d3e96206640db7f9706ae3" + +[[package]] +name = "opentelemetry_sdk" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afdefb21d1d47394abc1ba6c57363ab141be19e27cc70d0e422b7f303e4d290b" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry", + "percent-encoding", + "rand 0.9.0", + "thiserror 2.0.12", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -5291,6 +5477,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "psm" version = "0.1.25" @@ -5346,6 +5555,21 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.37.4" @@ -5505,6 +5729,24 @@ dependencies = [ "zerocopy 0.8.18", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.1", +] + +[[package]] +name = "raw-cpuid" +version = "11.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146" +dependencies = [ + "bitflags", +] + [[package]] name = "recursive" version = "0.1.1" @@ -5669,6 +5911,7 @@ checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da" dependencies = [ "base64 0.22.1", "bytes", + "futures-channel", "futures-core", "futures-util", "h2 0.4.7", @@ -5697,7 +5940,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.1", "tokio-util", - "tower", + "tower 0.5.2", "tower-service", "url", "wasm-bindgen", @@ -6351,6 +6594,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" +[[package]] +name = "sketches-ddsketch" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" + [[package]] name = "slab" version = "0.4.9" @@ -7118,6 +7367,52 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.2" @@ -7189,6 +7484,34 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd8e764bd6f5813fd8bebc3117875190c5b0415be8f7f8059bffb6ecd979c444" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.19" @@ -7199,12 +7522,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] @@ -7481,7 +7807,7 @@ dependencies = [ "thiserror 2.0.12", "tokio", "tokio-stream", - "tower", + "tower 0.5.2", "tracing", ] @@ -7718,7 +8044,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index fd83fd55c7..93cd11f5ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ arrow-select = { version = "55" } arrow-string = { version = "55" } as-any = "0.3.2" async-std = "1.12" +async-stream = "0.3" async-trait = "0.1.88" aws-config = "1.6.1" aws-sdk-glue = "1.39" @@ -82,6 +83,7 @@ indicatif = "0.17" itertools = "0.13" linkedbytes = "0.1.8" metainfo = "0.7.14" +metrics = "0.24" mimalloc = "0.1.46" mockito = "1" motore-macros = "0.4.3" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 52f5b4d8b5..71955b88bc 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -56,6 +56,7 @@ arrow-select = { workspace = true } arrow-string = { workspace = true } as-any = { workspace = true } async-std = { workspace = true, optional = true, features = ["attributes"] } +async-stream = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } bimap = { workspace = true } @@ -66,6 +67,7 @@ expect-test = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } +metrics = { workspace = true } moka = { version = "0.12.10", features = ["future"] } murmur3 = { workspace = true } num-bigint = { workspace = true } @@ -86,6 +88,7 @@ serde_with = { workspace = true } strum = { workspace = true, features = ["derive"] } thrift = { workspace = true } tokio = { workspace = true, optional = false, features = ["sync"] } +tracing = { workspace = true } typed-builder = { workspace = true } url = { workspace = true } uuid = { workspace = true } diff --git a/crates/iceberg/src/arrow/count_recording_record_batch_stream.rs b/crates/iceberg/src/arrow/count_recording_record_batch_stream.rs new file mode 100644 index 0000000000..dff7ec3592 --- /dev/null +++ b/crates/iceberg/src/arrow/count_recording_record_batch_stream.rs @@ -0,0 +1,68 @@ +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::task::{Context, Poll}; + +use arrow_array::RecordBatch; +use futures::Stream; + +use crate::Result; + +pub(crate) struct CountRecordingRecordBatchStream { + stream: S, + row_count: AtomicU64, + record_batch_count: AtomicU64, + target_span: tracing::Span, + record_batch_count_field_name: &'static str, + row_count_field_name: &'static str, +} + +impl CountRecordingRecordBatchStream { + pub(crate) fn new( + stream: S, + target_span: tracing::Span, + record_batch_count_field_name: &'static str, + row_count_field_name: &'static str, + ) -> Self { + Self { + stream, + row_count: AtomicU64::new(0), + record_batch_count: AtomicU64::new(0), + target_span, + record_batch_count_field_name, + row_count_field_name, + } + } +} + +impl Stream for CountRecordingRecordBatchStream +where S: Stream> + Unpin +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + match Pin::new(&mut this.stream).poll_next(cx) { + Poll::Ready(Some(Ok(batch))) => { + let row_count = batch.num_rows() as u64; + + this.row_count.fetch_add(row_count, Ordering::Relaxed); + this.record_batch_count.fetch_add(1, Ordering::Relaxed); + + Poll::Ready(Some(Ok(batch))) + } + other => other, + } + } +} + +impl Drop for CountRecordingRecordBatchStream { + fn drop(&mut self) { + let total_record_batches = self.record_batch_count.load(Ordering::Relaxed); + let total_rows = self.row_count.load(Ordering::Relaxed); + self.target_span + .record(self.record_batch_count_field_name, total_record_batches); + self.target_span + .record(self.row_count_field_name, total_rows); + } +} diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 0dd53a34fa..c5e9d228a0 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -104,6 +104,7 @@ impl DeleteFilter { } /// Builds eq delete predicate for the provided task. + #[tracing::instrument(skip_all, level = "trace")] pub(crate) async fn build_equality_delete_predicate( &self, file_scan_task: &FileScanTask, diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 949f842412..6378c7f9d2 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -27,6 +27,7 @@ pub(crate) mod caching_delete_file_loader; pub mod delete_file_loader; pub(crate) mod delete_filter; +pub(crate) mod count_recording_record_batch_stream; mod reader; pub(crate) mod record_batch_projector; pub(crate) mod record_batch_transformer; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 4327184058..f5beefbde5 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -41,8 +41,10 @@ use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; +use tracing::Instrument; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; +use crate::arrow::count_recording_record_batch_stream::CountRecordingRecordBatchStream; use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; @@ -54,6 +56,7 @@ use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; +use crate::traced_stream::TracedStream; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; @@ -138,16 +141,20 @@ pub struct ArrowReader { impl ArrowReader { /// Take a stream of FileScanTasks and reads all the files. /// Returns a stream of Arrow RecordBatches containing the data from the files - pub async fn read(self, tasks: FileScanTaskStream) -> Result { + #[tracing::instrument(skip_all, level = "info", name = "iceberg.scan.execute")] + pub fn read(self, tasks: FileScanTaskStream) -> Result { let file_io = self.file_io.clone(); let batch_size = self.batch_size; let concurrency_limit_data_files = self.concurrency_limit_data_files; let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; + let span = tracing::Span::current(); + let span_clone = span.clone(); let stream = tasks .map_ok(move |task| { let file_io = file_io.clone(); + let file_span = span_clone.clone(); Self::process_file_scan_task( task, @@ -156,18 +163,35 @@ impl ArrowReader { self.delete_file_loader.clone(), row_group_filtering_enabled, row_selection_enabled, + file_span, ) }) .map_err(|err| { Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err) }) .try_buffer_unordered(concurrency_limit_data_files) - .try_flatten_unordered(concurrency_limit_data_files); + .try_flatten_unordered(concurrency_limit_data_files) + .boxed(); - Ok(Box::pin(stream) as ArrowRecordBatchStream) + Ok(Box::pin(TracedStream::new(stream, vec![span])) as ArrowRecordBatchStream) } #[allow(clippy::too_many_arguments)] + #[tracing::instrument( + skip_all, + level = "debug", + name = "iceberg.scan.execute.process_data_file", + fields( + iceberg.scan.execute.data_file.data_file_path = %task.data_file_path, + iceberg.scan.execute.data_file.row_groups.skipped_count, + iceberg.scan.execute.data_file.row_groups.selected_count, + iceberg.scan.execute.data_file.row_selection.skipped_count, + iceberg.scan.execute.data_file.row_selection.selected_count, + iceberg.scan.execute.data_file.rows.scanned_count, + iceberg.scan.execute.data_file.rows.selected_count, + iceberg.scan.execute.data_file.record_batches.count, + ) + )] async fn process_file_scan_task( task: FileScanTask, batch_size: Option, @@ -175,7 +199,9 @@ impl ArrowReader { delete_file_loader: CachingDeleteFileLoader, row_group_filtering_enabled: bool, row_selection_enabled: bool, + parent_span: tracing::Span, ) -> Result { + let span = tracing::Span::current(); let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); @@ -240,6 +266,8 @@ impl ArrowReader { // by using a `RowSelection`. let mut selected_row_group_indices = None; let mut row_selection = None; + let mut selected_row_count: Option = task.record_count; + let mut skipped_row_count: u64 = 0; if let Some(predicate) = final_predicate { let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map( @@ -263,6 +291,43 @@ impl ArrowReader { &task.schema, )?; + tracing::Span::current().record( + "iceberg.scan.execute.data_file.row_groups.selected_count", + result.len(), + ); + tracing::Span::current().record( + "iceberg.scan.execute.data_file.row_groups.skipped_count", + record_batch_stream_builder.metadata().row_groups().len() - result.len(), + ); + selected_row_count = Some( + result + .iter() + .map(|&row_group_index| { + record_batch_stream_builder + .metadata() + .row_group(row_group_index) + .num_rows() as u64 + }) + .sum::(), + ); + tracing::Span::current().record( + "iceberg.scan.execute.data_file.row_groups.selected_row_count", + selected_row_count.unwrap(), + ); + skipped_row_count += (0..record_batch_stream_builder.metadata().row_groups().len()) + .filter(|&i| !result.contains(&i)) + .map(|i| { + record_batch_stream_builder + .metadata() + .row_group(i) + .num_rows() as u64 + }) + .sum::(); + tracing::Span::current().record( + "iceberg.scan.execute.data_file.row_groups.skipped_row_count", + skipped_row_count, + ); + selected_row_group_indices = Some(result); } @@ -301,6 +366,20 @@ impl ArrowReader { } if let Some(row_selection) = row_selection { + let new_selected_row_count = row_selection.row_count() as u64; + tracing::Span::current().record( + "iceberg.scan.execute.data_file.row_selection.selected_count", + new_selected_row_count, + ); + + if let Some(selected_row_count) = selected_row_count { + tracing::Span::current().record( + "iceberg.scan.execute.data_file.row_selection.skipped_count", + selected_row_count - new_selected_row_count, + ); + } + selected_row_count = Some(new_selected_row_count); + record_batch_stream_builder = record_batch_stream_builder.with_row_selection(row_selection); } @@ -310,19 +389,44 @@ impl ArrowReader { record_batch_stream_builder.with_row_groups(selected_row_group_indices); } - // Build the batch stream and send all the RecordBatches that it generates - // to the requester. - let record_batch_stream = - record_batch_stream_builder - .build()? - .map(move |batch| match batch { - Ok(batch) => record_batch_transformer.process_record_batch(batch), - Err(err) => Err(err.into()), - }); + tracing::Span::current().record( + "iceberg.scan.execute.data_file.rows.scanned_count", + selected_row_count, + ); - Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) - } + let stream_span = tracing::trace_span!("iceberg.scan.execute.process_data_file_stream"); + let stream_span_clone = stream_span.clone(); + let _guard = stream_span.enter(); + // Build the batch stream and send all the RecordBatches that it generates + // to the requester. + let stream = record_batch_stream_builder + .build()? + .map(move |batch| match batch { + Ok(batch) => record_batch_transformer.process_record_batch(batch), + Err(err) => Err(err.into()), + }); + drop(_guard); + + let stream = Box::pin(CountRecordingRecordBatchStream::new( + stream, + span.clone(), + "iceberg.scan.execute.data_file.record_batches.count", + "iceberg.scan.execute.data_file.rows.selected_count", + )); + + Ok(Box::pin(TracedStream::new(stream, vec![ + parent_span, + span, + stream_span_clone, + ])) as ArrowRecordBatchStream) + } + + #[tracing::instrument( + skip(file_io, should_load_page_index), + level = "trace", + name = "iceberg.scan.execute.create_parquet_record_batch_stream_builder" + )] pub(crate) async fn create_parquet_record_batch_stream_builder( data_file_path: &str, file_io: FileIO, @@ -339,11 +443,19 @@ impl ArrowReader { .with_preload_page_index(should_load_page_index); // Create the record batch stream builder, which wraps the parquet file reader - let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options( - parquet_file_reader, - ArrowReaderOptions::new(), - ) + let span = tracing::trace_span!( + "iceberg.scan.execute.create_parquet_record_batch_stream_builder.build" + ); + let record_batch_stream_builder = async move { + ParquetRecordBatchStreamBuilder::new_with_options( + parquet_file_reader, + ArrowReaderOptions::new(), + ) + .await + } + .instrument(span) .await?; + Ok(record_batch_stream_builder) } @@ -352,6 +464,11 @@ impl ArrowReader { /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated /// as having been deleted by a positional delete, taking into account any row groups that have /// been skipped entirely by the filter predicate + #[tracing::instrument( + skip_all, + level = "trace", + name = "iceberg.scan.execute.build_deletes_row_selection" + )] fn build_deletes_row_selection( row_group_metadata_list: &[RowGroupMetaData], selected_row_groups: &Option>, @@ -495,6 +612,11 @@ impl ArrowReader { } } + #[tracing::instrument( + skip_all, + level = "trace", + name = "iceberg.scan.execute.get_arrow_projection_mask" + )] fn get_arrow_projection_mask( field_ids: &[i32], iceberg_schema_of_task: &Schema, @@ -615,6 +737,11 @@ impl ArrowReader { } } + #[tracing::instrument( + skip_all, + level = "trace", + name = "iceberg.scan.execute.get_row_filter" + )] fn get_row_filter( predicates: &BoundPredicate, parquet_schema: &SchemaDescriptor, @@ -644,6 +771,11 @@ impl ArrowReader { Ok(RowFilter::new(vec![Box::new(arrow_predicate)])) } + #[tracing::instrument( + skip_all, + level = "trace", + name = "iceberg.scan.execute.get_selected_row_group_indices" + )] fn get_selected_row_group_indices( predicate: &BoundPredicate, parquet_metadata: &Arc, @@ -667,6 +799,11 @@ impl ArrowReader { Ok(results) } + #[tracing::instrument( + skip_all, + level = "trace", + name = "iceberg.scan.execute.get_row_selection_for_filter_predicate" + )] fn get_row_selection_for_filter_predicate( predicate: &BoundPredicate, parquet_metadata: &Arc, @@ -1751,7 +1888,6 @@ message schema { let result = reader .read(tasks) - .await .unwrap() .try_collect::>() .await diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index d8f7a872e1..5d5d480552 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -16,131 +16,45 @@ // under the License. use std::collections::HashMap; -use std::ops::Deref; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; -use futures::StreamExt; -use futures::channel::mpsc::{Sender, channel}; -use tokio::sync::Notify; - -use crate::runtime::spawn; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, DataFile, Struct}; /// Index of delete files -#[derive(Debug, Clone)] +#[derive(Debug, Default)] pub(crate) struct DeleteFileIndex { - state: Arc>, -} - -#[derive(Debug)] -enum DeleteFileIndexState { - Populating(Arc), - Populated(PopulatedDeleteFileIndex), -} - -#[derive(Debug)] -struct PopulatedDeleteFileIndex { #[allow(dead_code)] global_deletes: Vec>, eq_deletes_by_partition: HashMap>>, pos_deletes_by_partition: HashMap>>, - // TODO: do we need this? - // pos_deletes_by_path: HashMap>>, - // TODO: Deletion Vector support } -impl DeleteFileIndex { - /// create a new `DeleteFileIndex` along with the sender that populates it with delete files - pub(crate) fn new() -> (DeleteFileIndex, Sender) { - // TODO: what should the channel limit be? - let (tx, rx) = channel(10); - let notify = Arc::new(Notify::new()); - let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating( - notify.clone(), - ))); - let delete_file_stream = rx.boxed(); - - spawn({ - let state = state.clone(); - async move { - let delete_files = delete_file_stream.collect::>().await; - - let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files); - - { - let mut guard = state.write().unwrap(); - *guard = DeleteFileIndexState::Populated(populated_delete_file_index); - } - notify.notify_waiters(); - } - }); - - (DeleteFileIndex { state }, tx) - } - - /// Gets all the delete files that apply to the specified data file. - pub(crate) async fn get_deletes_for_data_file( - &self, - data_file: &DataFile, - seq_num: Option, - ) -> Vec { - let notifier = { - let guard = self.state.read().unwrap(); - match *guard { - DeleteFileIndexState::Populating(ref notifier) => notifier.clone(), - DeleteFileIndexState::Populated(ref index) => { - return index.get_deletes_for_data_file(data_file, seq_num); - } - } - }; - - notifier.notified().await; - - let guard = self.state.read().unwrap(); - match guard.deref() { - DeleteFileIndexState::Populated(index) => { - index.get_deletes_for_data_file(data_file, seq_num) - } - _ => unreachable!("Cannot be any other state than loaded"), - } - } -} - -impl PopulatedDeleteFileIndex { - /// Creates a new populated delete file index from a list of delete file contexts, which - /// allows for fast lookup when determining which delete files apply to a given data file. - /// - /// 1. The partition information is extracted from each delete file's manifest entry. - /// 2. If the partition is empty and the delete file is not a positional delete, - /// it is added to the `global_deletes` vector - /// 3. Otherwise, the delete file is added to one of two hash maps based on its content type. - fn new(files: Vec) -> PopulatedDeleteFileIndex { - let mut eq_deletes_by_partition: HashMap>> = - HashMap::default(); - let mut pos_deletes_by_partition: HashMap>> = - HashMap::default(); - - let mut global_deletes: Vec> = vec![]; - - files.into_iter().for_each(|ctx| { +impl Extend for DeleteFileIndex { + fn extend>(&mut self, iter: T) { + // 1. The partition information is extracted from each delete file's manifest entry. + // 2. If the partition is empty and the delete file is not a positional delete, + // it is added to the `global_deletes` vector + // 3. Otherwise, the delete file is added to one of two hash maps based on its content type. + for ctx in iter { let arc_ctx = Arc::new(ctx); let partition = arc_ctx.manifest_entry.data_file().partition(); - // The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes". + // The spec states that "Equality delete files stored with an unpartitioned spec + // are applied as global deletes". if partition.fields().is_empty() { // TODO: confirm we're good to skip here if we encounter a pos del if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes { - global_deletes.push(arc_ctx); - return; + self.global_deletes.push(arc_ctx); + continue; } } let destination_map = match arc_ctx.manifest_entry.content_type() { - DataContentType::PositionDeletes => &mut pos_deletes_by_partition, - DataContentType::EqualityDeletes => &mut eq_deletes_by_partition, + DataContentType::PositionDeletes => &mut self.pos_deletes_by_partition, + DataContentType::EqualityDeletes => &mut self.eq_deletes_by_partition, _ => unreachable!(), }; @@ -150,17 +64,13 @@ impl PopulatedDeleteFileIndex { entry.push(arc_ctx.clone()); }) .or_insert(vec![arc_ctx.clone()]); - }); - - PopulatedDeleteFileIndex { - global_deletes, - eq_deletes_by_partition, - pos_deletes_by_partition, } } +} +impl DeleteFileIndex { /// Determine all the delete files that apply to the provided `DataFile`. - fn get_deletes_for_data_file( + pub(crate) fn get_deletes_for_data_file( &self, data_file: &DataFile, seq_num: Option, diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs b/crates/iceberg/src/expr/visitors/expression_evaluator.rs index 4db1ad7d93..7510d7e947 100644 --- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs @@ -43,6 +43,7 @@ impl ExpressionEvaluator { /// the provided [`DataFile`]'s partition [`Struct`]. Used by [`TableScan`] /// to see if this [`DataFile`] could possibly contain data that matches /// the scan's filter. + #[tracing::instrument(skip_all, level = "trace")] pub(crate) fn eval(&self, data_file: &DataFile) -> Result { let mut visitor = ExpressionEvaluatorVisitor::new(data_file.partition()); diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs index a00376e1ac..8386988b89 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs @@ -39,6 +39,7 @@ impl<'a> InclusiveMetricsEvaluator<'a> { /// provided [`DataFile`]'s metrics. Used by [`TableScan`] to /// see if this `DataFile` contains data that could match /// the scan's filter. + #[tracing::instrument(skip_all, level = "trace")] pub(crate) fn eval( filter: &'a BoundPredicate, data_file: &'a DataFile, diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 389397ecae..acc06b2827 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -280,6 +280,11 @@ impl InputFile { } /// Fetch and returns metadata of file. + #[tracing::instrument( + skip_all, + level = "trace", + name = "iceberg.io.build_data_file_metadata" + )] pub async fn metadata(&self) -> crate::Result { let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?; @@ -302,6 +307,7 @@ impl InputFile { /// Creates [`FileRead`] for continuous reading. /// /// For one-time reading, use [`Self::read`] instead. + #[tracing::instrument(skip_all, level = "trace", name = "iceberg.io.build_data_file_reader")] pub async fn reader(&self) -> crate::Result> { Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?) } diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index a23ff36b32..31f4afd3de 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -85,6 +85,12 @@ impl ObjectCache { /// Retrieves an Arc [`Manifest`] from the cache /// or retrieves one from FileIO and parses it if not present + #[tracing::instrument( + skip_all, + name = "iceberg.object_cache.get_manifest" + level = "debug", + fields(iceberg.object_cache.manifest.cache_hit) + )] pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result> { if self.cache_disabled { return manifest_file @@ -101,13 +107,23 @@ impl ObjectCache { .or_try_insert_with(self.fetch_and_parse_manifest(manifest_file)) .await .map_err(|err| { + tracing::error!( + iceberg.error_msg = %err.to_string(), + "failed to load manifest from cache" + ); Error::new( ErrorKind::Unexpected, format!("Failed to load manifest {}", manifest_file.manifest_path), ) .with_source(err) - })? - .into_value(); + })?; + + tracing::Span::current().record( + "iceberg.object_cache.manifest.cache_hit", + cache_entry.is_fresh(), + ); + + let cache_entry = cache_entry.into_value(); match cache_entry { CachedItem::Manifest(arc_manifest) => Ok(arc_manifest), @@ -163,6 +179,7 @@ impl ObjectCache { } async fn fetch_and_parse_manifest(&self, manifest_file: &ManifestFile) -> Result { + tracing::trace!("iceberg.object_cache.cache_miss"); let manifest = manifest_file.load_manifest(&self.file_io).await?; Ok(CachedItem::Manifest(Arc::new(manifest))) diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 63af25e9a5..7bf2adcbd7 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -90,3 +90,4 @@ pub mod writer; mod delete_vector; pub mod puffin; +mod traced_stream; diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 1e4ef41b2b..e53612a1e9 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -17,8 +17,10 @@ use std::sync::Arc; -use futures::channel::mpsc::Sender; -use futures::{SinkExt, TryFutureExt}; +use futures::StreamExt; +use futures::stream::BoxStream; +use tracing::Instrument; +use tracing::field::Empty; use crate::delete_file_index::DeleteFileIndex; use crate::expr::{Bind, BoundPredicate, Predicate}; @@ -31,87 +33,116 @@ use crate::spec::{ ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef, TableMetadataRef, }; +use crate::traced_stream::TracedStream; use crate::{Error, ErrorKind, Result}; /// Wraps a [`ManifestFile`] alongside the objects that are needed /// to process it in a thread-safe manner pub(crate) struct ManifestFileContext { manifest_file: ManifestFile, - - sender: Sender, - field_ids: Arc>, bound_predicates: Option>, object_cache: Arc, snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, - delete_file_index: DeleteFileIndex, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed /// to process it in a thread-safe manner pub(crate) struct ManifestEntryContext { pub manifest_entry: ManifestEntryRef, - pub expression_evaluator_cache: Arc, pub field_ids: Arc>, pub bound_predicates: Option>, pub partition_spec_id: i32, pub snapshot_schema: SchemaRef, - pub delete_file_index: DeleteFileIndex, + pub(crate) span: tracing::Span, } impl ManifestFileContext { /// Consumes this [`ManifestFileContext`], fetching its Manifest from FileIO and then - /// streaming its constituent [`ManifestEntries`] to the channel provided in the context - pub(crate) async fn fetch_manifest_and_stream_manifest_entries(self) -> Result<()> { + /// streaming its constituent [`ManifestEntries`] + pub(crate) async fn fetch_manifest_and_stream_entries( + self, + parent_span: tracing::Span, + ) -> Result>> { + let manifest_span = tracing::debug_span!( + parent: &parent_span, + "iceberg.scan.plan.process_manifest", + iceberg.scan.plan.manifest.file_path = self.manifest_file.manifest_path, + iceberg.scan.plan.manifest.entries_count = Empty, + ); + + let span = manifest_span.clone(); + let ManifestFileContext { object_cache, manifest_file, bound_predicates, snapshot_schema, field_ids, - mut sender, expression_evaluator_cache, - delete_file_index, .. } = self; - let manifest = object_cache.get_manifest(&manifest_file).await?; - - for manifest_entry in manifest.entries() { - let manifest_entry_context = ManifestEntryContext { - // TODO: refactor to avoid the expensive ManifestEntry clone - manifest_entry: manifest_entry.clone(), - expression_evaluator_cache: expression_evaluator_cache.clone(), - field_ids: field_ids.clone(), - partition_spec_id: manifest_file.partition_spec_id, - bound_predicates: bound_predicates.clone(), - snapshot_schema: snapshot_schema.clone(), - delete_file_index: delete_file_index.clone(), - }; - - sender - .send(manifest_entry_context) - .map_err(|_| Error::new(ErrorKind::Unexpected, "mpsc channel SendError")) - .await?; + let (manifest, manifest_file) = async move { + let manifest = object_cache.get_manifest(&manifest_file).await; + (manifest, manifest_file) } + .instrument(manifest_span.clone()) + .await; + let manifest = manifest?; + + span.record( + "iceberg.scan.plan.manifest.entries_count", + manifest.entries().len(), + ); + + let stream = async_stream::stream! { + for manifest_entry in manifest.entries() { + let manifest_entry_span = tracing::debug_span!( + parent: span.clone(), + "iceberg.scan.plan.process_data_file", + iceberg.scam.plan.data_file.file_path = manifest_entry.file_path(), + "iceberg.scan.plan_data_file.type" = Empty, + iceberg.scan.plan.data_file.skipped = Empty, + iceberg.scan.plan.data_file.skipped_reason = Empty, + ); + + yield Ok(ManifestEntryContext { + manifest_entry: manifest_entry.clone(), + expression_evaluator_cache: expression_evaluator_cache.clone(), + field_ids: field_ids.clone(), + partition_spec_id: manifest_file.partition_spec_id, + bound_predicates: bound_predicates.clone(), + snapshot_schema: snapshot_schema.clone(), + span: manifest_entry_span, + }); + } + } + .boxed(); + + Ok(Box::pin(TracedStream::new(stream, vec![ + manifest_span.clone(), + ]))) + } - Ok(()) + pub(crate) fn is_delete(&self) -> bool { + self.manifest_file.content == ManifestContentType::Deletes } } impl ManifestEntryContext { /// consume this `ManifestEntryContext`, returning a `FileScanTask` /// created from it - pub(crate) async fn into_file_scan_task(self) -> Result { - let deletes = self - .delete_file_index - .get_deletes_for_data_file( - self.manifest_entry.data_file(), - self.manifest_entry.sequence_number(), - ) - .await; + pub(crate) fn into_file_scan_task( + self, + delete_file_index: Arc, + ) -> Result { + let deletes = delete_file_index.get_deletes_for_data_file( + self.manifest_entry.data_file(), + self.manifest_entry.sequence_number(), + ); Ok(FileScanTask { start: 0, @@ -134,7 +165,7 @@ impl ManifestEntryContext { /// PlanContext wraps a [`SnapshotRef`] alongside all the other /// objects that are required to perform a scan file plan. -#[derive(Debug)] +#[derive(Clone, Debug)] pub(crate) struct PlanContext { pub snapshot: SnapshotRef, @@ -152,6 +183,11 @@ pub(crate) struct PlanContext { } impl PlanContext { + #[tracing::instrument( + skip_all, + level = "debug", + fields(iceberg.scan.plan.manifest_list.file_path = ?self.snapshot.manifest_list()), + )] pub(crate) async fn get_manifest_list(&self) -> Result> { self.object_cache .as_ref() @@ -180,66 +216,65 @@ impl PlanContext { Ok(partition_filter) } + #[tracing::instrument( + skip_all, + level = "debug", + name = "iceberg.scan.plan.process_manifest_list", + fields( + iceberg.scan.plan.manifest_list.entries_count = manifest_list.entries().len(), + ) + )] pub(crate) fn build_manifest_file_contexts( &self, manifest_list: Arc, - tx_data: Sender, - delete_file_idx: DeleteFileIndex, - delete_file_tx: Sender, - ) -> Result> + 'static>> { - let manifest_files = manifest_list.entries().iter(); - - // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. - let mut filtered_mfcs = vec![]; - - for manifest_file in manifest_files { - let tx = if manifest_file.content == ManifestContentType::Deletes { - delete_file_tx.clone() - } else { - tx_data.clone() - }; - - let partition_bound_predicate = if self.predicate.is_some() { - let partition_bound_predicate = self.get_partition_filter(manifest_file)?; - - // evaluate the ManifestFile against the partition filter. Skip - // if it cannot contain any matching rows - if !self - .manifest_evaluator_cache - .get( - manifest_file.partition_spec_id, - partition_bound_predicate.clone(), - ) - .eval(manifest_file)? - { - continue; - } - - Some(partition_bound_predicate) - } else { - None - }; - - let mfc = self.create_manifest_file_context( - manifest_file, - partition_bound_predicate, - tx, - delete_file_idx.clone(), - ); - - filtered_mfcs.push(Ok(mfc)); - } - - Ok(Box::new(filtered_mfcs.into_iter())) + ) -> ( + Vec>, + Vec>, + ) { + let has_predicate = self.predicate.is_some(); + + (0..manifest_list.entries().len()) + .map(move |i| manifest_list.entries()[i].clone()) + .filter_map(move |manifest_file| { + // TODO: replace closure when `try_blocks` stabilizes + (|| { + let partition_bound_predicate = if has_predicate { + let predicate = self.get_partition_filter(&manifest_file)?; + + if !self + .manifest_evaluator_cache + .get(manifest_file.partition_spec_id, predicate.clone()) + .eval(&manifest_file)? + { + tracing::debug!( + iceberg.scan.plan.manifest.file_path = manifest_file.manifest_path, + iceberg.scan.plan.manifest.skip_reason = "partition", + "iceberg.scan.plan.manifest_file.skipped" + ); + metrics::counter!("iceberg.scan.plan.manifest_file.skipped", "reason" => "partition").increment(1); + return Ok(None); // Skip this file. + } + Some(predicate) + } else { + None + }; + + metrics::counter!("iceberg.scan.plan.manifest_file.included").increment(1); + + let context = self + .create_manifest_file_context(manifest_file, partition_bound_predicate)?; + Ok(Some(context)) + })() + .transpose() + }) + .partition(|ctx| ctx.as_ref().map_or(true, |ctx| ctx.is_delete())) } fn create_manifest_file_context( &self, - manifest_file: &ManifestFile, + manifest_file: ManifestFile, partition_filter: Option>, - sender: Sender, - delete_file_index: DeleteFileIndex, - ) -> ManifestFileContext { + ) -> Result { let bound_predicates = if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) = (partition_filter, &self.snapshot_bound_predicate) @@ -252,15 +287,13 @@ impl PlanContext { None }; - ManifestFileContext { - manifest_file: manifest_file.clone(), + Ok(ManifestFileContext { + manifest_file, bound_predicates, - sender, object_cache: self.object_cache.clone(), snapshot_schema: self.snapshot_schema.clone(), field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), - delete_file_index, - } + }) } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index e987de859f..8dcf02489a 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -26,9 +26,8 @@ mod task; use std::sync::Arc; use arrow_array::RecordBatch; -use futures::channel::mpsc::{Sender, channel}; use futures::stream::BoxStream; -use futures::{SinkExt, StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; pub use task::*; use crate::arrow::ArrowReaderBuilder; @@ -36,9 +35,9 @@ use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; -use crate::runtime::spawn; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; +use crate::traced_stream::TracedStream; use crate::utils::available_parallelism; use crate::{Error, ErrorKind, Result}; @@ -333,101 +332,77 @@ impl TableScan { /// Returns a stream of [`FileScanTask`]s. pub async fn plan_files(&self) -> Result { let Some(plan_context) = self.plan_context.as_ref() else { + tracing::debug!("file plan requested for a table with no snapshots"); return Ok(Box::pin(futures::stream::empty())); }; - let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files; - let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries; - - // used to stream ManifestEntryContexts between stages of the file plan operation - let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) = - channel(concurrency_limit_manifest_files); - let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) = - channel(concurrency_limit_manifest_files); - - // used to stream the results back to the caller - let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); - - let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(); + let root_span = tracing::info_span!( + "iceberg.scan.plan", + iceberg.scan.predicate = plan_context.predicate.as_ref().map(|p| p.to_string()), + iceberg.scan.snapshot_id = plan_context.snapshot.snapshot_id(), + iceberg.scan.case_sensitive = plan_context.case_sensitive, + iceberg.scan.field_ids = ?plan_context.field_ids.as_ref(), + ); + let _entered = root_span.enter(); let manifest_list = plan_context.get_manifest_list().await?; - // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any - // whose partitions cannot match this - // scan's filter - let manifest_file_contexts = plan_context.build_manifest_file_contexts( - manifest_list, - manifest_entry_data_ctx_tx, - delete_file_idx.clone(), - manifest_entry_delete_ctx_tx, - )?; - - let mut channel_for_manifest_error = file_scan_task_tx.clone(); - - // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s - spawn(async move { - let result = futures::stream::iter(manifest_file_contexts) - .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move { - ctx.fetch_manifest_and_stream_manifest_entries().await - }) - .await; - - if let Err(error) = result { - let _ = channel_for_manifest_error.send(Err(error)).await; - } - }); - - let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); - let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); - - // Process the delete file [`ManifestEntry`] stream in parallel - spawn(async move { - let result = manifest_entry_delete_ctx_rx - .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) - .try_for_each_concurrent( - concurrency_limit_manifest_entries, - |(manifest_entry_context, tx)| async move { - spawn(async move { - Self::process_delete_manifest_entry(manifest_entry_context, tx).await - }) - .await - }, - ) - .await; + let (delete_contexts, data_contexts) = + plan_context.build_manifest_file_contexts(manifest_list); - if let Err(error) = result { - let _ = channel_for_delete_manifest_entry_error - .send(Err(error)) - .await; - } - }) - .await; - - // Process the data file [`ManifestEntry`] stream in parallel - spawn(async move { - let result = manifest_entry_data_ctx_rx - .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) - .try_for_each_concurrent( - concurrency_limit_manifest_entries, - |(manifest_entry_context, tx)| async move { - spawn(async move { - Self::process_data_manifest_entry(manifest_entry_context, tx).await - }) - .await - }, - ) - .await; + let delete_file_index = self.build_delete_file_index(delete_contexts).await?; - if let Err(error) = result { - let _ = channel_for_data_manifest_entry_error.send(Err(error)).await; - } - }); + let stream_span = tracing::debug_span!( + parent: &root_span, + "iceberg.scan.plan.process_data_file_manifests" + ); + let stream_span_outer_clone = stream_span.clone(); + let stream = TableScan::process_manifest_contexts( + data_contexts, + self.concurrency_limit_manifest_files, + self.concurrency_limit_manifest_entries, + move |ctx, _span| { + let delete_file_index = delete_file_index.clone(); + async move { Self::process_data_manifest_entry(ctx, delete_file_index) } + }, + stream_span, + ) + .boxed(); + + Ok(Box::pin(TracedStream::new(stream, vec![ + root_span.clone(), + stream_span_outer_clone, + ]))) + } - Ok(file_scan_task_rx.boxed()) + #[tracing::instrument( + skip_all, + name = "iceberg.scan.plan.process_delete_file_manifests", + level = "debug", + fields(iceberg.scan.plan.delete_file_manifest.count = delete_contexts.len()), + )] + async fn build_delete_file_index( + &self, + delete_contexts: Vec>, + ) -> Result> { + let delete_file_index: DeleteFileIndex = TableScan::process_manifest_contexts( + delete_contexts, + self.concurrency_limit_manifest_files, + self.concurrency_limit_manifest_entries, + |ctx, _span| async move { Self::process_delete_manifest_entry(ctx) }, + tracing::Span::current(), + ) + .try_collect() + .await?; + + Ok(Arc::new(delete_file_index)) } /// Returns an [`ArrowRecordBatchStream`]. pub async fn to_arrow(&self) -> Result { + let root_span = tracing::info_span!("iceberg.scan.to_arrow",); + let _entered = root_span.enter(); + let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) .with_data_file_concurrency_limit(self.concurrency_limit_data_files) .with_row_group_filtering_enabled(self.row_group_filtering_enabled) @@ -437,10 +412,11 @@ impl TableScan { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); } - arrow_reader_builder + let stream = arrow_reader_builder .build() - .read(self.plan_files().await?) - .await + .read(self.plan_files().await?)?; + + Ok(Box::pin(TracedStream::new(stream, vec![root_span.clone()])) as ArrowRecordBatchStream) } /// Returns a reference to the column names of the table scan. @@ -453,17 +429,64 @@ impl TableScan { self.plan_context.as_ref().map(|x| &x.snapshot) } - async fn process_data_manifest_entry( - manifest_entry_context: ManifestEntryContext, - mut file_scan_task_tx: Sender>, - ) -> Result<()> { + /// Helper method to process manifest file contexts into a stream of results + fn process_manifest_contexts( + contexts: Vec>, + concurrency_limit_manifest_files: usize, + concurrency_limit_manifest_entries: usize, + processor: F, + span: tracing::Span, + ) -> impl Stream> + where + F: Fn(Result, tracing::Span) -> Fut + Send + Sync + 'static + Clone, + Fut: Future>> + Send + 'static, + T: Send + 'static, + { + let processor_span_clone = span.clone(); + let fetcher_span_clone = span.clone(); + futures::stream::iter(contexts) + .map(move |ctx: Result| { + let fetcher_span = fetcher_span_clone.clone(); + async move { + match ctx { + Ok(ctx) => ctx.fetch_manifest_and_stream_entries(fetcher_span).await, + Err(error) => Err(error), + } + } + }) + .buffer_unordered(concurrency_limit_manifest_files) + .try_flatten_unordered(None) + .map(move |ctx| { + let span = processor_span_clone.clone(); + let proc = processor.clone(); + async move { proc(ctx, span).await } + }) + .buffer_unordered(concurrency_limit_manifest_entries) + .try_filter_map(|opt_task| async move { Ok(opt_task) }) + .boxed() + } + + fn process_data_manifest_entry( + manifest_entry_context: Result, + delete_file_index: Arc, + ) -> Result> { + let manifest_entry_context = manifest_entry_context?; + + let span = manifest_entry_context.span.clone(); + let _guard = span.enter(); + // skip processing this manifest entry if it has been marked as deleted if !manifest_entry_context.manifest_entry.is_alive() { - return Ok(()); + span.record("iceberg.scan.plan.data_file.skipped", true); + span.record("iceberg.scan.plan.data_file.skipped_reason", "not_alive"); + metrics::counter!("iceberg.scan.plan.data_file.skipped", "reason" => "not_alive") + .increment(1); + return Ok(None); } // abort the plan if we encounter a manifest entry for a delete file if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data { + tracing::error!("Encountered an entry for a delete file in a data file manifest"); return Err(Error::new( ErrorKind::FeatureUnsupported, "Encountered an entry for a delete file in a data file manifest", @@ -487,7 +510,11 @@ impl TableScan { // skip any data file whose partition data indicates that it can't contain // any data that matches this scan's filter if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? { - return Ok(()); + span.record("iceberg.scan.plan.data_file.skipped", true); + span.record("iceberg.scan.plan.data_file.skipped_reason", "partition"); + metrics::counter!("iceberg.scan.plan.data_file.skipped", "reason" => "partition") + .increment(1); + return Ok(None); } // skip any data file whose metrics don't match this scan's filter @@ -496,31 +523,55 @@ impl TableScan { manifest_entry_context.manifest_entry.data_file(), false, )? { - return Ok(()); + span.record("iceberg.scan.plan.data_file.skipped", true); + span.record("iceberg.scan.plan.data_file.skipped_reason", "file_metrics"); + metrics::counter!("iceberg.scan.plan.data_file.skipped", "reason" => "file_metrics") + .increment(1); + return Ok(None); } } // congratulations! the manifest entry has made its way through the // entire plan without getting filtered out. Create a corresponding // FileScanTask and push it to the result stream - file_scan_task_tx - .send(Ok(manifest_entry_context.into_file_scan_task().await?)) - .await?; - - Ok(()) + span.record("iceberg.scan.plan.data_file.skipped", false); + metrics::counter!("iceberg.scan.plan.data_file.included").increment(1); + Ok(Some( + manifest_entry_context.into_file_scan_task(delete_file_index)?, + )) } - async fn process_delete_manifest_entry( - manifest_entry_context: ManifestEntryContext, - mut delete_file_ctx_tx: Sender, - ) -> Result<()> { + #[tracing::instrument( + skip_all, + level = "debug", + fields( + iceberg.scan.plan.delete_file.file_path, + iceberg.scan.plan.delete_file.skipped, + iceberg.scan.plan.delete_file.skipped_reason, + ) + )] + fn process_delete_manifest_entry( + manifest_entry_context: Result, + ) -> Result> { + let manifest_entry_context = manifest_entry_context?; + tracing::Span::current().record( + "iceberg.scan.plan.delete_file.file_path", + manifest_entry_context.manifest_entry.file_path(), + ); + // skip processing this manifest entry if it has been marked as deleted if !manifest_entry_context.manifest_entry.is_alive() { - return Ok(()); + tracing::Span::current().record("iceberg.scan.plan.delete_file.skipped", true); + tracing::Span::current() + .record("iceberg.scan.plan.delete_file.skipped_reason", "not_alive"); + metrics::counter!("iceberg.scan.plan.delete_file.skipped", "reason" => "not_alive") + .increment(1); + return Ok(None); } // abort the plan if we encounter a manifest entry that is not for a delete file if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data { + tracing::error!("Encountered an entry for a data file in a delete manifest"); return Err(Error::new( ErrorKind::FeatureUnsupported, "Encountered an entry for a data file in a delete manifest", @@ -539,18 +590,23 @@ impl TableScan { // skip any data file whose partition data indicates that it can't contain // any data that matches this scan's filter if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? { - return Ok(()); + tracing::Span::current().record("iceberg.scan.plan.delete_file.skipped", true); + tracing::Span::current().record( + "iceberg.scan.plan.delete_file.skipped_reason", + "partition_not_matched", + ); + metrics::counter!("iceberg.scan.plan.delete_file.skipped", "reason" => "partition") + .increment(1); + return Ok(None); } } - delete_file_ctx_tx - .send(DeleteFileContext { - manifest_entry: manifest_entry_context.manifest_entry.clone(), - partition_spec_id: manifest_entry_context.partition_spec_id, - }) - .await?; - - Ok(()) + tracing::Span::current().record("iceberg.scan.plan.delete_file.skipped", false); + metrics::counter!("iceberg.scan.plan.delete_file.included").increment(1); + Ok(Some(DeleteFileContext { + manifest_entry: manifest_entry_context.manifest_entry.clone(), + partition_spec_id: manifest_entry_context.partition_spec_id, + })) } } @@ -1332,14 +1388,12 @@ pub mod tests { let batch_stream = reader .clone() .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) - .await .unwrap(); let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap(); let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); let batch_stream = reader .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) - .await .unwrap(); let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap(); diff --git a/crates/iceberg/src/traced_stream.rs b/crates/iceberg/src/traced_stream.rs new file mode 100644 index 0000000000..f601db08a2 --- /dev/null +++ b/crates/iceberg/src/traced_stream.rs @@ -0,0 +1,35 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::Stream; +use tracing::Span; + +pub struct TracedStream { + stream: S, + _spans: Vec, +} + +impl TracedStream { + pub fn new(stream: S, spans: Vec) -> Self { + Self { + stream, + _spans: spans, + } + } +} + +impl Stream for TracedStream +where S: Stream + Unpin +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let _entered = this + ._spans + .iter() + .map(|span| span.enter()) + .collect::>(); + Pin::new(&mut this.stream).poll_next(cx) + } +} diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index 07eea5f375..fd59e2f92d 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -38,3 +38,6 @@ parquet = { workspace = true } tokio = { workspace = true } uuid = { workspace = true } ordered-float = "2.10.1" + +metrics-exporter-prometheus = "0.17" +init-tracing-opentelemetry = { version = "0.28", features = ["tracing_subscriber_ext"] } diff --git a/crates/integration_tests/src/lib.rs b/crates/integration_tests/src/lib.rs index 1d2d5dc1ec..40aa2d0ae8 100644 --- a/crates/integration_tests/src/lib.rs +++ b/crates/integration_tests/src/lib.rs @@ -29,16 +29,26 @@ pub struct TestFixture { pub catalog_config: RestCatalogConfig, } -pub fn set_test_fixture(func: &str) -> TestFixture { - set_up(); - let docker_compose = DockerCompose::new( +pub fn set_test_fixture(func: &str, set_up_tracing_subscriber: bool) -> TestFixture { + if set_up_tracing_subscriber { + set_up(); + } + let mut docker_compose = DockerCompose::new( normalize_test_name(format!("{}_{func}", module_path!())), format!("{}/testdata", env!("CARGO_MANIFEST_DIR")), ); - // Stop any containers from previous runs and start new ones - docker_compose.down(); - docker_compose.up(); + if std::env::var("ICEBERG_INTEG_TEST_PERSISTENT_DOCKER_STACK").is_err() { + // Stop any containers from previous runs and start new ones + docker_compose.down(); + docker_compose.up(); + } else { + // check if the containers are running and start them if not + if !docker_compose.is_running() { + docker_compose.up(); + } + docker_compose.keep_running(); + } let rest_catalog_ip = docker_compose.get_container_ip("rest"); let minio_ip = docker_compose.get_container_ip("minio"); diff --git a/crates/integration_tests/testdata/docker-compose.yaml b/crates/integration_tests/testdata/docker-compose.yaml index 7b4efebb26..fee194ff0e 100644 --- a/crates/integration_tests/testdata/docker-compose.yaml +++ b/crates/integration_tests/testdata/docker-compose.yaml @@ -80,6 +80,55 @@ services: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 + volumes: + - ./spark/entrypoint.sh:/home/entrypoint.sh:ro + - ./spark/provision.py:/home/provision.py:ro links: - rest:rest - minio:minio + + jaeger: + image: jaegertracing/all-in-one:1.60 + environment: + - COLLECTOR_OTLP_ENABLED=true + - JAEGER_DISABLED=false + networks: + rest_bridge: + ports: + - "16686:16686" # Jaeger UI + - "14268:14268" # Jaeger collector HTTP + - "14250:14250" # Jaeger collector gRPC + - "6831:6831/udp" # Jaeger agent UDP + - "6832:6832/udp" # Jaeger agent UDP + - "4317:4317" # OTLP gRPC receiver + - "4318:4318" # OTLP HTTP receiver + expose: + - 16686 + - 14268 + - 14250 + - 6831 + - 6832 + - 4317 + - 4318 + + prometheus: + image: prom/prometheus:v2.54.1 + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.console.libraries=/etc/prometheus/console_libraries' + - '--web.console.templates=/etc/prometheus/consoles' + - '--storage.tsdb.retention.time=200h' + - '--web.enable-lifecycle' + networks: + rest_bridge: + ports: + - "9090:9090" + expose: + - 9090 + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro + depends_on: + - rest + - minio + - spark-iceberg \ No newline at end of file diff --git a/crates/integration_tests/testdata/prometheus.yml b/crates/integration_tests/testdata/prometheus.yml new file mode 100644 index 0000000000..54d8008484 --- /dev/null +++ b/crates/integration_tests/testdata/prometheus.yml @@ -0,0 +1,26 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + - job_name: 'prometheus' + static_configs: + - targets: ['localhost:9090'] + + - job_name: 'minio' + static_configs: + - targets: ['minio:9000'] + metrics_path: /minio/v2/metrics/cluster + scrape_interval: 5s + + - job_name: 'iceberg-rest' + static_configs: + - targets: ['rest:8181'] + metrics_path: /metrics + scrape_interval: 5s + + - job_name: 'integration-tests' + static_configs: + - targets: [ 'tests:9000' ] + metrics_path: /metrics + scrape_interval: 1s diff --git a/crates/integration_tests/testdata/spark/Dockerfile b/crates/integration_tests/testdata/spark/Dockerfile index 420edb2318..414c1f2b0e 100644 --- a/crates/integration_tests/testdata/spark/Dockerfile +++ b/crates/integration_tests/testdata/spark/Dockerfile @@ -50,8 +50,10 @@ RUN chmod u+x /opt/spark/sbin/* && \ WORKDIR '/home/' -COPY entrypoint.sh . -COPY provision.py . +CMD ls /tmp -lha + +# COPY entrypoint.sh . +# COPY provision.py . HEALTHCHECK --retries=120 --interval=1s \ CMD ls /tmp/ready || exit 1 diff --git a/crates/integration_tests/testdata/spark/provision.py b/crates/integration_tests/testdata/spark/provision.py index 1929266b4a..6f6c793e4b 100755 --- a/crates/integration_tests/testdata/spark/provision.py +++ b/crates/integration_tests/testdata/spark/provision.py @@ -16,20 +16,40 @@ # under the License. from pyspark.sql import SparkSession -from pyspark.sql.functions import current_date, date_add, expr +from pyspark.sql.functions import * +from pyspark.sql.types import * +import random +from datetime import datetime, timedelta +import time -# The configuration is important, otherwise we get many small -# parquet files with a single row. When a positional delete -# hits the Parquet file with one row, the parquet file gets -# dropped instead of having a merge-on-read delete file. +print("doing stuff...") + + +# Configure Spark for better performance with large datasets spark = ( SparkSession .builder - .config("spark.sql.shuffle.partitions", "1") - .config("spark.default.parallelism", "1") + .appName("IcebergDemo") + .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.apache.iceberg:iceberg-aws-bundle:1.5.0") + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + # .config("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog") + # .config("spark.sql.catalog.rest.type", "rest") + # .config("spark.sql.catalog.rest.uri", "http://rest:8181") + # .config("spark.sql.catalog.rest.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + # .config("spark.sql.catalog.rest.warehouse", "s3://icebergdata/demo") + # .config("spark.sql.catalog.rest.s3.endpoint", "http://minio:9000") + # .config("spark.sql.defaultCatalog", "rest") + .config("spark.sql.shuffle.partitions", "200") + .config("spark.default.parallelism", "200") + .config("spark.sql.adaptive.enabled", "true") + .config("spark.sql.adaptive.coalescePartitions.enabled", "true") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() ) +# Set log level to reduce noise +spark.sparkContext.setLogLevel("INFO") + spark.sql( f""" CREATE OR REPLACE TABLE rest.default.test_positional_merge_on_read_deletes ( @@ -149,3 +169,289 @@ SELECT EXPLODE(SEQUENCE(0, 1000)) AS s ); """) + +print("Starting NYC Taxi Dataset Generation...") +start_time = time.time() + +# Create the main taxi trips table +spark.sql( + f""" +CREATE OR REPLACE TABLE rest.default.nyc_taxi_trips ( + trip_id STRING, + vendor_id INTEGER, + pickup_datetime TIMESTAMP, + dropoff_datetime TIMESTAMP, + passenger_count INTEGER, + trip_distance DOUBLE, + pickup_longitude DOUBLE, + pickup_latitude DOUBLE, + dropoff_longitude DOUBLE, + dropoff_latitude DOUBLE, + payment_type STRING, + fare_amount DOUBLE, + extra DOUBLE, + mta_tax DOUBLE, + tip_amount DOUBLE, + tolls_amount DOUBLE, + total_amount DOUBLE, + pickup_location_id INTEGER, + dropoff_location_id INTEGER, + trip_type STRING, + congestion_surcharge DOUBLE, + pickup_date DATE +) +USING iceberg +PARTITIONED BY (pickup_date, vendor_id) +TBLPROPERTIES ( + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read', + 'format-version'='2', + 'write.parquet.row-group-size-bytes'='33554432', + 'write.target-file-size-bytes'='134217728', + 'write.parquet.page-row-limit'='20000' +) +""" +) + +print("Table created. Generating fake taxi trip data...") + +# Generate data in batches to avoid memory issues +BATCH_SIZE = 100_000 +TOTAL_ROWS = 10_000_000 +NUM_BATCHES = TOTAL_ROWS // BATCH_SIZE + +# NYC coordinates boundaries (approximate) +NYC_LAT_MIN, NYC_LAT_MAX = 40.477399, 40.917577 +NYC_LON_MIN, NYC_LON_MAX = -74.259090, -73.700272 + +# Payment types +PAYMENT_TYPES = ["Credit Card", "Cash", "No Charge", "Dispute", "Unknown"] +TRIP_TYPES = ["Street-hail", "Dispatch"] + +# Create UDF for generating random coordinates within NYC bounds +def generate_nyc_coordinates(): + lat = random.uniform(NYC_LAT_MIN, NYC_LAT_MAX) + lon = random.uniform(NYC_LON_MIN, NYC_LON_MAX) + return (lat, lon) + +# Register UDF +spark.udf.register("generate_coords", generate_nyc_coordinates, ArrayType(DoubleType())) + +for batch in range(NUM_BATCHES): + print(f"Processing batch {batch + 1}/{NUM_BATCHES}...") + + # Generate base sequence for this batch + batch_df = spark.range(BATCH_SIZE).select( + # Trip ID + concat(lit("trip_"), col("id").cast("string")).alias("trip_id"), + + # Vendor ID (1 or 2) + (col("id") % 2 + 1).cast("integer").alias("vendor_id"), + + # Pickup datetime (last 2 years) + (to_timestamp(lit("2022-01-01")) + + expr("INTERVAL {} SECONDS".format(random.randint(0, 730 * 24 * 60 * 60)))).alias("pickup_datetime"), + + # Passenger count (1-6, weighted towards 1-2) + when(col("id") % 100 < 60, 1) + .when(col("id") % 100 < 85, 2) + .when(col("id") % 100 < 95, 3) + .when(col("id") % 100 < 98, 4) + .when(col("id") % 100 < 99, 5) + .otherwise(6).alias("passenger_count"), + + # Trip distance (0.1 to 30 miles, log-normal distribution simulation) + round( + when(col("id") % 1000 < 600, rand() * 3 + 0.5) # Short trips + .when(col("id") % 1000 < 900, rand() * 8 + 2) # Medium trips + .otherwise(rand() * 20 + 5), # Long trips + 2 + ).alias("trip_distance"), + + # Location IDs (1-265 for taxi zones) + (col("id") % 265 + 1).cast("integer").alias("pickup_location_id"), + ((col("id") + 17) % 265 + 1).cast("integer").alias("dropoff_location_id"), + + # Payment type + when(col("id") % 100 < 70, "Credit Card") + .when(col("id") % 100 < 95, "Cash") + .when(col("id") % 100 < 98, "No Charge") + .when(col("id") % 100 < 99, "Dispute") + .otherwise("Unknown").alias("payment_type"), + + # Trip type + when(col("id") % 10 < 8, "Street-hail") + .otherwise("Dispatch").alias("trip_type") + ).withColumn( + # Generate pickup coordinates + "pickup_coords", expr("array(rand() * {} + {}, rand() * {} + {})".format( + NYC_LAT_MAX - NYC_LAT_MIN, NYC_LAT_MIN, + NYC_LON_MAX - NYC_LON_MIN, NYC_LON_MIN + )) + ).withColumn( + # Generate dropoff coordinates + "dropoff_coords", expr("array(rand() * {} + {}, rand() * {} + {})".format( + NYC_LAT_MAX - NYC_LAT_MIN, NYC_LAT_MIN, + NYC_LON_MAX - NYC_LON_MIN, NYC_LON_MIN + )) + ).withColumn( + # Calculate trip duration in seconds + "trip_duration_seconds", + (col("trip_distance") * 120 + 300 + (rand() * 600)).cast("integer") + ).withColumn( + # Calculate dropoff time (pickup + trip duration) + "dropoff_datetime", + col("pickup_datetime") + expr("make_interval(0, 0, 0, 0, 0, 0, trip_duration_seconds)") + ).select( + col("trip_id"), + col("vendor_id"), + col("pickup_datetime"), + col("dropoff_datetime"), + col("passenger_count"), + col("trip_distance"), + col("pickup_coords")[0].alias("pickup_latitude"), + col("pickup_coords")[1].alias("pickup_longitude"), + col("dropoff_coords")[0].alias("dropoff_latitude"), + col("dropoff_coords")[1].alias("dropoff_longitude"), + col("payment_type"), + col("pickup_location_id"), + col("dropoff_location_id"), + col("trip_type"), + ).withColumn( + # Calculate fare based on distance and time + "fare_amount", + round(lit(2.50) + col("trip_distance") * lit(2.50) + rand() * lit(5.0), 2) +).withColumn( + # Extra charges + "extra", + when(hour(col("pickup_datetime")).between(20, 23) | + hour(col("pickup_datetime")).between(0, 5), lit(0.5)) + .otherwise(lit(0.0)) +).withColumn( + # MTA tax + "mta_tax", lit(0.5) +).withColumn( + # Tip amount (higher for credit cards) + "tip_amount", + when(col("payment_type") == "Credit Card", + round(col("fare_amount") * (rand() * lit(0.25) + lit(0.15)), 2)) + .otherwise(lit(0.0)) +).withColumn( + # Tolls (random, sparse) + "tolls_amount", + when(rand() < lit(0.05), round(rand() * lit(8.0) + lit(2.0), 2)) + .otherwise(lit(0.0)) +).withColumn( + # Congestion surcharge (weekdays in Manhattan) + "congestion_surcharge", + when(col("pickup_location_id") <= 68, lit(2.5)) # Manhattan-ish zones + .otherwise(lit(0.0)) +).withColumn( + # Total amount + "total_amount", + round(col("fare_amount") + col("extra") + col("mta_tax") + + col("tip_amount") + col("tolls_amount") + col("congestion_surcharge"), 2) + ).withColumn( + # Pickup date for partitioning + "pickup_date", + to_date(col("pickup_datetime")) + ) + + batch_df = batch_df.select( + col("trip_id").cast("string"), + col("vendor_id").cast("integer"), + col("pickup_datetime").cast("timestamp"), + col("dropoff_datetime").cast("timestamp"), + col("passenger_count").cast("integer"), + col("trip_distance").cast("double"), + col("pickup_longitude").cast("double"), + col("pickup_latitude").cast("double"), + col("dropoff_longitude").cast("double"), + col("dropoff_latitude").cast("double"), + col("payment_type").cast("string"), + col("fare_amount").cast("double"), + col("extra").cast("double"), + col("mta_tax").cast("double"), + col("tip_amount").cast("double"), + col("tolls_amount").cast("double"), + col("total_amount").cast("double"), + col("pickup_location_id").cast("integer"), + col("dropoff_location_id").cast("integer"), + col("trip_type").cast("string"), + col("congestion_surcharge").cast("double"), + col("pickup_date").cast("date") + ) + + # Insert the batch + batch_df.write.mode("append").insertInto("rest.default.nyc_taxi_trips") + + # Progress update + rows_processed = (batch + 1) * BATCH_SIZE + elapsed_time = time.time() - start_time + rate = rows_processed / elapsed_time if elapsed_time > 0 else 0 + print(f"Inserted {rows_processed:,} rows. Rate: {rate:,.0f} rows/sec") + +print(f"\nCompleted! Generated {TOTAL_ROWS:,} taxi trip records. Rewriting data files to apply sort") + +print("Checking available system procedures...") + +try: + # Check if we can see system procedures + result = spark.sql("SHOW PROCEDURES").collect() + print("Available procedures:") + for row in result: + print(f" {row}") +except Exception as e: + print(f"Could not show procedures: {e}") + +try: + # Try to show procedures from the rest catalog specifically + result = spark.sql("SHOW PROCEDURES rest.system").collect() + print("Rest catalog procedures:") + for row in result: + print(f" {row}") +except Exception as e: + print(f"Could not show rest.system procedures: {e}") + +# Enable more verbose logging to see what's happening +spark.sparkContext.setLogLevel("DEBUG") + +print("Attempting rewrite with more verbose error handling...") + +try: + result = spark.sql(""" + CALL rest.system.rewrite_data_files( + 'default.nyc_taxi_trips', + 'sort', + 'zorder(pickup_latitude, pickup_longitude)' + ) + """) + print("Rewrite successful!") + result.show() +except Exception as e: + print(f"Detailed error: {e}") + import traceback + traceback.print_exc() + +# Try alternative syntax variations +alternative_calls = [ + "CALL system.rewrite_data_files('default.nyc_taxi_trips')", + "CALL rest.system.rewrite_data_files('default.nyc_taxi_trips')", + "CALL rest.system.rewrite_data_files(table => 'default.nyc_taxi_trips')", +] + +for call_sql in alternative_calls: + try: + print(f"Trying: {call_sql}") + result = spark.sql(call_sql) + print("Success!") + result.show() + break + except Exception as e: + print(f"Failed: {e}") + +print(f"Total time: {time.time() - start_time:.2f} seconds") + + +spark.stop() \ No newline at end of file diff --git a/crates/integration_tests/tests/shared.rs b/crates/integration_tests/tests/shared.rs index 6bdddaa6cf..4b110662d5 100644 --- a/crates/integration_tests/tests/shared.rs +++ b/crates/integration_tests/tests/shared.rs @@ -25,11 +25,19 @@ pub mod shared_tests; static DOCKER_CONTAINERS: OnceLock> = OnceLock::new(); pub fn get_shared_containers() -> &'static Arc { - DOCKER_CONTAINERS.get_or_init(|| Arc::new(set_test_fixture("shared_tests"))) + DOCKER_CONTAINERS.get_or_init(|| Arc::new(set_test_fixture("shared_tests", true))) +} + +pub fn get_shared_containers_no_tracing_sub() -> &'static Arc { + DOCKER_CONTAINERS.get_or_init(|| Arc::new(set_test_fixture("shared_tests", false))) } #[dtor] fn shutdown() { + if std::env::var("ICEBERG_INTEG_TEST_PERSISTENT_DOCKER_STACK").is_ok() { + return; + } + if let Some(fixture) = DOCKER_CONTAINERS.get() { fixture._docker_compose.down() } diff --git a/crates/integration_tests/tests/shared_tests/mod.rs b/crates/integration_tests/tests/shared_tests/mod.rs index feb1c4e585..b231df297e 100644 --- a/crates/integration_tests/tests/shared_tests/mod.rs +++ b/crates/integration_tests/tests/shared_tests/mod.rs @@ -27,6 +27,7 @@ mod append_data_file_test; mod append_partition_data_file_test; mod conflict_commit_test; mod datafusion; +mod observability; mod read_evolved_schema; mod read_positional_deletes; mod scan_all_type; diff --git a/crates/integration_tests/tests/shared_tests/observability.rs b/crates/integration_tests/tests/shared_tests/observability.rs new file mode 100644 index 0000000000..5d278b65ac --- /dev/null +++ b/crates/integration_tests/tests/shared_tests/observability.rs @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use futures::TryStreamExt; +use iceberg::expr::Reference; +use iceberg::spec::Datum; +use iceberg::{Catalog, Error as IcebergError, TableIdent}; +use iceberg_catalog_rest::RestCatalog; +use init_tracing_opentelemetry::tracing_subscriber_ext::TracingGuard; +use metrics_exporter_prometheus::PrometheusBuilder; + +use crate::get_shared_containers_no_tracing_sub; + +#[tokio::test] +async fn test_observability() -> Result<(), IcebergError> { + let _guard = configure_o11y_exports(); + + let fixture = get_shared_containers_no_tracing_sub(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "nyc_taxi_trips"]).unwrap()) + .await?; + + let predicate = Reference::new("vendor_id") + .equal_to(Datum::long(1)) + .and(Reference::new("pickup_longitude").greater_than(Datum::double(-74.0))) + .and(Reference::new("pickup_latitude").less_than(Datum::double(40.7))); + + let scan = table + .scan() + .with_filter(predicate) + .with_row_selection_enabled(true) + .build()?; + + let results = scan.to_arrow().await?.try_collect::>().await?; + assert!(!results.is_empty()); + + // flush OTel OTLP traces to Jaeger + drop(_guard); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + + // TODO: confirm traces are present in Jaeger + + // TODO: check metrics are present in Prometheus + Ok(()) +} + +fn configure_o11y_exports() -> TracingGuard { + // RUST_LOG needs to contain otel::tracing=trace for traces to be exported. + unsafe { + std::env::set_var( + "RUST_LOG", + "info,iceberg=trace,otel::tracing=trace,otel=debug", + ) + }; + + // Set OTEL_SERVICE_NAME to identify the service in Jaeger + unsafe { std::env::set_var("OTEL_SERVICE_NAME", "test") }; + + // Set OTEL_EXPORTER_OTLP_ENDPOINT to export traces to Jaeger container + unsafe { std::env::set_var("OTEL_EXPORTER_OTLP_ENDPOINT", "grpc://localhost:4317") }; + + let _guard = init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers() + .expect("could not set up tracing-otel"); + + PrometheusBuilder::new() + .install() + .expect("could not set up prometheus metrics exporter"); + + _guard +} diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 51f4093927..81167a851c 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -59,7 +59,7 @@ async fn test_scan_all_type() { .with_schema_id(1) .with_identifier_field_ids(vec![2]) .with_fields(vec![ - // test all type + // test all types NestedField::required(1, "int", Type::Primitive(PrimitiveType::Int)).into(), NestedField::required(2, "long", Type::Primitive(PrimitiveType::Long)).into(), NestedField::required(3, "float", Type::Primitive(PrimitiveType::Float)).into(), diff --git a/crates/test_utils/src/docker.rs b/crates/test_utils/src/docker.rs index 832c22240e..e98f3b15a8 100644 --- a/crates/test_utils/src/docker.rs +++ b/crates/test_utils/src/docker.rs @@ -29,6 +29,7 @@ use crate::cmd::{get_cmd_output, get_cmd_output_result, run_command}; pub struct DockerCompose { project_name: String, docker_compose_dir: String, + keep_running: bool, } impl DockerCompose { @@ -36,9 +37,14 @@ impl DockerCompose { Self { project_name: project_name.to_string(), docker_compose_dir: docker_compose_dir.to_string(), + keep_running: false, } } + pub fn keep_running(&mut self) { + self.keep_running = true; + } + pub fn project_name(&self) -> &str { self.project_name.as_str() } @@ -67,7 +73,32 @@ impl DockerCompose { } } + pub fn is_running(&self) -> bool { + let mut cmd = Command::new("docker"); + cmd.args(vec![ + "compose", + "-p", + self.project_name.as_str(), + "ps", + "--status", + "running", + "--quiet", + ]); + cmd.current_dir(&self.docker_compose_dir); + + let result = get_cmd_output_result(cmd, "Check if containers are running".to_string()); + match result { + Ok(output) => !output.trim().is_empty(), + Err(_) => false, + } + } + pub fn up(&self) { + if self.is_running() { + println!("Docker compose stack is already running, reusing existing containers"); + return; + } + let mut cmd = Command::new("docker"); cmd.current_dir(&self.docker_compose_dir); @@ -138,6 +169,8 @@ impl DockerCompose { impl Drop for DockerCompose { fn drop(&mut self) { - self.down() + if !self.keep_running { + self.down() + } } }