From 9fba486a67aaed9b0e7c5af55d3b2ab3c149af20 Mon Sep 17 00:00:00 2001 From: Willi Raschkowski Date: Wed, 25 Dec 2024 14:12:14 +0100 Subject: [PATCH 1/4] Support metadata_log_entries metadata table --- crates/iceberg/src/metadata_scan.rs | 152 +++++++++++++++++++++++++++- crates/iceberg/src/scan.rs | 14 +-- crates/iceberg/src/table.rs | 2 +- 3 files changed, 157 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 16604d781..8be0ddfc4 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -26,6 +26,7 @@ use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondTyp use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; +use crate::spec::Snapshot; use crate::table::Table; use crate::Result; @@ -36,11 +37,11 @@ use crate::Result; /// - /// - #[derive(Debug)] -pub struct MetadataTable(Table); +pub struct MetadataTable<'a>(&'a Table); -impl MetadataTable { +impl<'a> MetadataTable<'a> { /// Creates a new metadata scan. - pub(super) fn new(table: Table) -> Self { + pub(super) fn new(table: &'a Table) -> Self { Self(table) } @@ -53,6 +54,11 @@ impl MetadataTable { pub fn manifests(&self) -> ManifestsTable { ManifestsTable { table: &self.0 } } + + /// Return the metadata log entries of the table. + pub fn metadata_log_entries(&self) -> MetadataLogEntriesTable { + MetadataLogEntriesTable { table: &self.0 } + } } /// Snapshots table. @@ -255,6 +261,89 @@ impl<'a> ManifestsTable<'a> { } } +/// Metadata log entries table. +/// +/// Use to inspect the current and historical metadata files in the table. +/// Contains every metadata file and the time it was added. For each metadata +/// file, the table contains information about the latest snapshot at the time. +pub struct MetadataLogEntriesTable<'a> { + table: &'a Table, +} + +impl<'a> MetadataLogEntriesTable<'a> { + /// Return the schema of the metadata log entries table. + pub fn schema(&self) -> Schema { + Schema::new(vec![ + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())), + false, + ), + Field::new("file", DataType::Utf8, false), + Field::new("latest_snapshot_id", DataType::Int64, true), + Field::new("latest_schema_id", DataType::Int32, true), + Field::new("latest_sequence_number", DataType::Int64, true), + ]) + } + + /// Scan the metadata log entries table. + pub fn scan(&self) -> Result { + let mut timestamp = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut file = StringBuilder::new(); + let mut latest_snapshot_id = PrimitiveBuilder::::new(); + let mut latest_schema_id = PrimitiveBuilder::::new(); + let mut latest_sequence_number = PrimitiveBuilder::::new(); + + let mut append_metadata_log_entry = |timestamp_ms: i64, metadata_file: &str| { + timestamp.append_value(timestamp_ms); + file.append_value(metadata_file); + + let snapshot = self.snapshot_id_as_of_time(timestamp_ms); + latest_snapshot_id.append_option(snapshot.map(|s| s.snapshot_id())); + latest_schema_id.append_option(snapshot.and_then(|s| s.schema_id())); + latest_sequence_number.append_option(snapshot.map(|s| s.sequence_number())); + }; + + for metadata_log_entry in self.table.metadata().metadata_log() { + append_metadata_log_entry( + metadata_log_entry.timestamp_ms, + &metadata_log_entry.metadata_file, + ); + } + + // Include the current metadata location and modification time in the table. This matches + // the Java implementation. Unlike the Java implementation, a current metadata location is + // optional here. In that case, we omit current metadata from the metadata log table. + if let Some(current_metadata_location) = &self.table.metadata_location() { + append_metadata_log_entry( + self.table.metadata().last_updated_ms(), + current_metadata_location, + ); + } + + Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ + Arc::new(timestamp.finish()), + Arc::new(file.finish()), + Arc::new(latest_snapshot_id.finish()), + Arc::new(latest_schema_id.finish()), + Arc::new(latest_sequence_number.finish()), + ])?) + } + + fn snapshot_id_as_of_time(&self, timestamp_ms_inclusive: i64) -> Option<&Snapshot> { + let table_metadata = self.table.metadata(); + let mut snapshot_id = None; + // The table metadata snapshot log is chronological + for log_entry in table_metadata.history() { + if log_entry.timestamp_ms <= timestamp_ms_inclusive { + snapshot_id = Some(log_entry.snapshot_id); + } + } + snapshot_id.and_then(|id| table_metadata.snapshot_by_id(id).map(|s| s.as_ref())) + } +} + #[cfg(test)] mod tests { use expect_test::{expect, Expect}; @@ -451,7 +540,7 @@ mod tests { partition_summaries: ListArray [ StructArray - -- validity: + -- validity: [ valid, ] @@ -482,4 +571,59 @@ mod tests { Some("path"), ); } + + #[test] + fn test_metadata_log_entries_table() { + let table = TableTestFixture::new().table; + let record_batch = table + .metadata_table() + .metadata_log_entries() + .scan() + .unwrap(); + + // Check the current metadata location is included + let current_metadata_location = table.metadata_location().unwrap(); + assert!(record_batch + .column_by_name("file") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .any(|location| location.is_some_and(|l| l.eq(current_metadata_location)))); + + check_record_batch( + record_batch, + expect![[r#" + Field { name: "timestamp", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "file", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "latest_snapshot_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "latest_schema_id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "latest_sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + timestamp: PrimitiveArray + [ + 1970-01-01T00:25:15.100+00:00, + 2020-10-14T01:22:53.590+00:00, + ], + file: (skipped), + latest_snapshot_id: PrimitiveArray + [ + null, + 3055729675574597004, + ], + latest_schema_id: PrimitiveArray + [ + null, + 1, + ], + latest_sequence_number: PrimitiveArray + [ + null, + 1, + ]"#]], + &["file"], + Some("timestamp"), + ); + } } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 5a97e74e7..ff3034d52 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -1002,7 +1002,13 @@ pub mod tests { let table_location = tmp_dir.path().join("table1"); let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro"); + // This is a past metadata location in the metadata log let table_metadata1_location = table_location.join("metadata/v1.json"); + // This is the actual location of current metadata + let template_json_location = format!( + "{}/testdata/example_table_metadata_v2.json", + env!("CARGO_MANIFEST_DIR") + ); let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) .unwrap() @@ -1010,11 +1016,7 @@ pub mod tests { .unwrap(); let table_metadata = { - let template_json_str = fs::read_to_string(format!( - "{}/testdata/example_table_metadata_v2.json", - env!("CARGO_MANIFEST_DIR") - )) - .unwrap(); + let template_json_str = fs::read_to_string(&template_json_location).unwrap(); let mut context = Context::new(); context.insert("table_location", &table_location); context.insert("manifest_list_1_location", &manifest_list1_location); @@ -1029,7 +1031,7 @@ pub mod tests { .metadata(table_metadata) .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) .file_io(file_io.clone()) - .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) + .metadata_location(template_json_location) .build() .unwrap(); diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index fa5304855..0c93a92fe 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -203,7 +203,7 @@ impl Table { /// Creates a metadata table which provides table-like APIs for inspecting metadata. /// See [`MetadataTable`] for more details. - pub fn metadata_table(self) -> MetadataTable { + pub fn metadata_table(&self) -> MetadataTable<'_> { MetadataTable::new(self) } From 439c5290446c455b8a08f6d256d44adefff522a7 Mon Sep 17 00:00:00 2001 From: Willi Raschkowski Date: Mon, 30 Dec 2024 18:00:11 +0100 Subject: [PATCH 2/4] Rename 'metadata_scan' to 'metadata_table' --- crates/iceberg/src/lib.rs | 2 +- crates/iceberg/src/{metadata_scan.rs => metadata_table.rs} | 0 crates/iceberg/src/table.rs | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename crates/iceberg/src/{metadata_scan.rs => metadata_table.rs} (100%) diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 1946f35f3..111b8a12f 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -73,7 +73,7 @@ mod avro; pub mod io; pub mod spec; -pub mod metadata_scan; +pub mod metadata_table; pub mod scan; pub mod expr; diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_table.rs similarity index 100% rename from crates/iceberg/src/metadata_scan.rs rename to crates/iceberg/src/metadata_table.rs diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 0c93a92fe..2a94a8ba3 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use crate::arrow::ArrowReaderBuilder; use crate::io::object_cache::ObjectCache; use crate::io::FileIO; -use crate::metadata_scan::MetadataTable; +use crate::metadata_table::MetadataTable; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; From 1c754ae007e8a37883bc2f9bfae671f4c04ba5fe Mon Sep 17 00:00:00 2001 From: Willi Raschkowski Date: Thu, 2 Jan 2025 13:57:27 +0100 Subject: [PATCH 3/4] Remove unnecessary borrowing --- crates/iceberg/src/metadata_table.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/metadata_table.rs b/crates/iceberg/src/metadata_table.rs index 8be0ddfc4..3577d9e01 100644 --- a/crates/iceberg/src/metadata_table.rs +++ b/crates/iceberg/src/metadata_table.rs @@ -47,17 +47,17 @@ impl<'a> MetadataTable<'a> { /// Get the snapshots table. pub fn snapshots(&self) -> SnapshotsTable { - SnapshotsTable { table: &self.0 } + SnapshotsTable { table: self.0 } } /// Get the manifests table. pub fn manifests(&self) -> ManifestsTable { - ManifestsTable { table: &self.0 } + ManifestsTable { table: self.0 } } /// Return the metadata log entries of the table. pub fn metadata_log_entries(&self) -> MetadataLogEntriesTable { - MetadataLogEntriesTable { table: &self.0 } + MetadataLogEntriesTable { table: self.0 } } } From 85b385d1ccf93abdaaec67c3bae24739f2f09a16 Mon Sep 17 00:00:00 2001 From: Willi Raschkowski Date: Thu, 2 Jan 2025 14:17:39 +0100 Subject: [PATCH 4/4] Make metadata table order match docs --- crates/iceberg/src/metadata_table.rs | 122 +++++++++++++-------------- 1 file changed, 61 insertions(+), 61 deletions(-) diff --git a/crates/iceberg/src/metadata_table.rs b/crates/iceberg/src/metadata_table.rs index 3577d9e01..d20a18cd5 100644 --- a/crates/iceberg/src/metadata_table.rs +++ b/crates/iceberg/src/metadata_table.rs @@ -45,6 +45,11 @@ impl<'a> MetadataTable<'a> { Self(table) } + /// Return the metadata log entries of the table. + pub fn metadata_log_entries(&self) -> MetadataLogEntriesTable { + MetadataLogEntriesTable { table: self.0 } + } + /// Get the snapshots table. pub fn snapshots(&self) -> SnapshotsTable { SnapshotsTable { table: self.0 } @@ -54,11 +59,6 @@ impl<'a> MetadataTable<'a> { pub fn manifests(&self) -> ManifestsTable { ManifestsTable { table: self.0 } } - - /// Return the metadata log entries of the table. - pub fn metadata_log_entries(&self) -> MetadataLogEntriesTable { - MetadataLogEntriesTable { table: self.0 } - } } /// Snapshots table. @@ -399,6 +399,61 @@ mod tests { )); } + #[test] + fn test_metadata_log_entries_table() { + let table = TableTestFixture::new().table; + let record_batch = table + .metadata_table() + .metadata_log_entries() + .scan() + .unwrap(); + + // Check the current metadata location is included + let current_metadata_location = table.metadata_location().unwrap(); + assert!(record_batch + .column_by_name("file") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .any(|location| location.is_some_and(|l| l.eq(current_metadata_location)))); + + check_record_batch( + record_batch, + expect![[r#" + Field { name: "timestamp", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "file", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "latest_snapshot_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "latest_schema_id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "latest_sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + timestamp: PrimitiveArray + [ + 1970-01-01T00:25:15.100+00:00, + 2020-10-14T01:22:53.590+00:00, + ], + file: (skipped), + latest_snapshot_id: PrimitiveArray + [ + null, + 3055729675574597004, + ], + latest_schema_id: PrimitiveArray + [ + null, + 1, + ], + latest_sequence_number: PrimitiveArray + [ + null, + 1, + ]"#]], + &["file"], + Some("timestamp"), + ); + } + #[test] fn test_snapshots_table() { let table = TableTestFixture::new().table; @@ -540,7 +595,7 @@ mod tests { partition_summaries: ListArray [ StructArray - -- validity: + -- validity: [ valid, ] @@ -571,59 +626,4 @@ mod tests { Some("path"), ); } - - #[test] - fn test_metadata_log_entries_table() { - let table = TableTestFixture::new().table; - let record_batch = table - .metadata_table() - .metadata_log_entries() - .scan() - .unwrap(); - - // Check the current metadata location is included - let current_metadata_location = table.metadata_location().unwrap(); - assert!(record_batch - .column_by_name("file") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .any(|location| location.is_some_and(|l| l.eq(current_metadata_location)))); - - check_record_batch( - record_batch, - expect![[r#" - Field { name: "timestamp", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "file", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "latest_snapshot_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "latest_schema_id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "latest_sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], - expect![[r#" - timestamp: PrimitiveArray - [ - 1970-01-01T00:25:15.100+00:00, - 2020-10-14T01:22:53.590+00:00, - ], - file: (skipped), - latest_snapshot_id: PrimitiveArray - [ - null, - 3055729675574597004, - ], - latest_schema_id: PrimitiveArray - [ - null, - 1, - ], - latest_sequence_number: PrimitiveArray - [ - null, - 1, - ]"#]], - &["file"], - Some("timestamp"), - ); - } }