Skip to content

Commit 8d76183

Browse files
committed
Support metadata_log_entries metadata table
1 parent 8ca0ab6 commit 8d76183

File tree

2 files changed

+152
-8
lines changed

2 files changed

+152
-8
lines changed

crates/iceberg/src/metadata_scan.rs

Lines changed: 144 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
use std::sync::Arc;
2121

2222
use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
23-
use arrow_array::types::{Int64Type, TimestampMillisecondType};
23+
use arrow_array::types::{Int32Type, Int64Type, TimestampMillisecondType};
2424
use arrow_array::RecordBatch;
2525
use arrow_schema::{DataType, Field, Schema, TimeUnit};
2626

27-
use crate::spec::TableMetadataRef;
27+
use crate::spec::{SnapshotRef, TableMetadataRef};
2828
use crate::table::Table;
2929
use crate::Result;
3030

@@ -36,20 +36,27 @@ use crate::Result;
3636
#[derive(Debug)]
3737
pub struct MetadataScan {
3838
metadata_ref: TableMetadataRef,
39+
metadata_location: Option<String>,
3940
}
4041

4142
impl MetadataScan {
4243
/// Creates a new metadata scan.
4344
pub fn new(table: &Table) -> Self {
4445
Self {
4546
metadata_ref: table.metadata_ref(),
47+
metadata_location: table.metadata_location().map(String::from),
4648
}
4749
}
4850

4951
/// Returns the snapshots of the table.
5052
pub fn snapshots(&self) -> Result<RecordBatch> {
5153
SnapshotsTable::scan(self)
5254
}
55+
56+
/// Return the metadata log entries of the table.
57+
pub fn metadata_log_entries(&self) -> Result<RecordBatch> {
58+
MetadataLogEntriesTable::scan(self)
59+
}
5360
}
5461

5562
/// Table metadata scan.
@@ -137,6 +144,90 @@ impl MetadataTable for SnapshotsTable {
137144
}
138145
}
139146

147+
/// Metadata log entries table.
148+
///
149+
/// Use to inspect the current and historical metadata files in the table.
150+
/// Contains every metadata file and the time it was added. For each metadata
151+
/// file, the table contains information about the latest snapshot at the time.
152+
pub struct MetadataLogEntriesTable;
153+
154+
impl MetadataLogEntriesTable {
155+
fn snapshot_id_as_of_time(
156+
table_metadata: &TableMetadataRef,
157+
timestamp_ms_inclusive: i64,
158+
) -> Option<&SnapshotRef> {
159+
let mut snapshot_id = None;
160+
// The table metadata snapshot log is chronological
161+
for log_entry in table_metadata.history() {
162+
if log_entry.timestamp_ms <= timestamp_ms_inclusive {
163+
snapshot_id = Some(log_entry.snapshot_id);
164+
}
165+
}
166+
snapshot_id.and_then(|id| table_metadata.snapshot_by_id(id))
167+
}
168+
}
169+
170+
impl MetadataTable for MetadataLogEntriesTable {
171+
fn schema() -> Schema {
172+
Schema::new(vec![
173+
Field::new(
174+
"timestamp",
175+
DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
176+
false,
177+
),
178+
Field::new("file", DataType::Utf8, false),
179+
Field::new("latest_snapshot_id", DataType::Int64, true),
180+
Field::new("latest_schema_id", DataType::Int32, true),
181+
Field::new("latest_sequence_number", DataType::Int64, true),
182+
])
183+
}
184+
185+
fn scan(scan: &MetadataScan) -> Result<RecordBatch> {
186+
let mut timestamp =
187+
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
188+
let mut file = StringBuilder::new();
189+
let mut latest_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
190+
let mut latest_schema_id = PrimitiveBuilder::<Int32Type>::new();
191+
let mut latest_sequence_number = PrimitiveBuilder::<Int64Type>::new();
192+
193+
let mut append_metadata_log_entry = |timestamp_ms: i64, metadata_file: &str| {
194+
timestamp.append_value(timestamp_ms);
195+
file.append_value(metadata_file);
196+
197+
let snapshot =
198+
MetadataLogEntriesTable::snapshot_id_as_of_time(&scan.metadata_ref, timestamp_ms);
199+
latest_snapshot_id.append_option(snapshot.map(|s| s.snapshot_id()));
200+
latest_schema_id.append_option(snapshot.and_then(|s| s.schema_id()));
201+
latest_sequence_number.append_option(snapshot.map(|s| s.sequence_number()));
202+
};
203+
204+
for metadata_log_entry in scan.metadata_ref.metadata_log() {
205+
append_metadata_log_entry(
206+
metadata_log_entry.timestamp_ms,
207+
&metadata_log_entry.metadata_file,
208+
);
209+
}
210+
211+
// Include the current metadata locaction and modification time in the table. This matches
212+
// the Java implementation. Unlike the Java implementation, a current metadata location is
213+
// optional here. In that case, we omit current metadata from the metadata log table.
214+
if let Some(current_metadata_location) = &scan.metadata_location {
215+
append_metadata_log_entry(
216+
scan.metadata_ref.last_updated_ms(),
217+
current_metadata_location,
218+
);
219+
}
220+
221+
Ok(RecordBatch::try_new(Arc::new(Self::schema()), vec![
222+
Arc::new(timestamp.finish()),
223+
Arc::new(file.finish()),
224+
Arc::new(latest_snapshot_id.finish()),
225+
Arc::new(latest_schema_id.finish()),
226+
Arc::new(latest_sequence_number.finish()),
227+
])?)
228+
}
229+
}
230+
140231
#[cfg(test)]
141232
mod tests {
142233
use expect_test::{expect, Expect};
@@ -262,4 +353,55 @@ mod tests {
262353
Some("committed_at"),
263354
);
264355
}
356+
357+
#[test]
358+
fn test_metadata_log_entries_table() {
359+
let table = TableTestFixture::new().table;
360+
let record_batch = table.metadata_scan().metadata_log_entries().unwrap();
361+
362+
// Check the current metadata location is included
363+
let current_metadata_location = table.metadata_location().unwrap();
364+
assert!(record_batch
365+
.column_by_name("file")
366+
.unwrap()
367+
.as_any()
368+
.downcast_ref::<arrow_array::StringArray>()
369+
.unwrap()
370+
.iter()
371+
.any(|location| location.is_some_and(|l| l.eq(current_metadata_location))));
372+
373+
check_record_batch(
374+
record_batch,
375+
expect![[r#"
376+
Field { name: "timestamp", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
377+
Field { name: "file", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
378+
Field { name: "latest_snapshot_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
379+
Field { name: "latest_schema_id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
380+
Field { name: "latest_sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
381+
expect![[r#"
382+
timestamp: PrimitiveArray<Timestamp(Millisecond, Some("+00:00"))>
383+
[
384+
1970-01-01T00:25:15.100+00:00,
385+
2020-10-14T01:22:53.590+00:00,
386+
],
387+
file: (skipped),
388+
latest_snapshot_id: PrimitiveArray<Int64>
389+
[
390+
null,
391+
3055729675574597004,
392+
],
393+
latest_schema_id: PrimitiveArray<Int32>
394+
[
395+
null,
396+
1,
397+
],
398+
latest_sequence_number: PrimitiveArray<Int64>
399+
[
400+
null,
401+
1,
402+
]"#]],
403+
&["file"],
404+
Some("timestamp"),
405+
);
406+
}
265407
}

crates/iceberg/src/scan.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -993,19 +993,21 @@ pub mod tests {
993993
let table_location = tmp_dir.path().join("table1");
994994
let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
995995
let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
996+
// This is a past metadata location in the metadata log
996997
let table_metadata1_location = table_location.join("metadata/v1.json");
998+
// This is the actual location of current metadata
999+
let template_json_location = format!(
1000+
"{}/testdata/example_table_metadata_v2.json",
1001+
env!("CARGO_MANIFEST_DIR")
1002+
);
9971003

9981004
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
9991005
.unwrap()
10001006
.build()
10011007
.unwrap();
10021008

10031009
let table_metadata = {
1004-
let template_json_str = fs::read_to_string(format!(
1005-
"{}/testdata/example_table_metadata_v2.json",
1006-
env!("CARGO_MANIFEST_DIR")
1007-
))
1008-
.unwrap();
1010+
let template_json_str = fs::read_to_string(&template_json_location).unwrap();
10091011
let mut context = Context::new();
10101012
context.insert("table_location", &table_location);
10111013
context.insert("manifest_list_1_location", &manifest_list1_location);
@@ -1020,7 +1022,7 @@ pub mod tests {
10201022
.metadata(table_metadata)
10211023
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
10221024
.file_io(file_io.clone())
1023-
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
1025+
.metadata_location(template_json_location)
10241026
.build()
10251027
.unwrap();
10261028

0 commit comments

Comments
 (0)