Skip to content

Commit 9a6eff3

Browse files
committed
WIP
1 parent 9d3ca08 commit 9a6eff3

File tree

12 files changed

+564
-353
lines changed

12 files changed

+564
-353
lines changed

crates/catalog/glue/src/catalog.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,9 @@ impl Catalog for GlueCatalog {
355355
}
356356
};
357357

358-
let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
358+
let metadata = TableMetadataBuilder::from_table_creation(creation)?
359+
.build()?
360+
.metadata;
359361
let metadata_location = create_metadata_location(&location, 0)?;
360362

361363
self.file_io

crates/catalog/glue/src/schema.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,9 @@ mod tests {
198198
.location("my_location".to_string())
199199
.schema(schema)
200200
.build();
201-
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
201+
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
202+
.build()?
203+
.metadata;
202204

203205
Ok(metadata)
204206
}

crates/catalog/glue/src/utils.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,9 @@ mod tests {
299299
.location("my_location".to_string())
300300
.schema(schema)
301301
.build();
302-
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
302+
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
303+
.build()?
304+
.metadata;
303305

304306
Ok(metadata)
305307
}

crates/catalog/hms/src/catalog.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,9 @@ impl Catalog for HmsCatalog {
346346
}
347347
};
348348

349-
let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
349+
let metadata = TableMetadataBuilder::from_table_creation(creation)?
350+
.build()?
351+
.metadata;
350352
let metadata_location = create_metadata_location(&location, 0)?;
351353

352354
self.file_io

crates/catalog/memory/src/catalog.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,9 @@ impl Catalog for MemoryCatalog {
194194
}
195195
};
196196

197-
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
197+
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
198+
.build()?
199+
.metadata;
198200
let metadata_location = format!(
199201
"{}/metadata/{}-{}.metadata.json",
200202
&location,
@@ -371,7 +373,7 @@ mod tests {
371373
let expected_sorted_order = SortOrder::builder()
372374
.with_order_id(0)
373375
.with_fields(vec![])
374-
.build(expected_schema.clone())
376+
.build(&expected_schema)
375377
.unwrap();
376378

377379
assert_eq!(

crates/iceberg/src/catalog/mod.rs

+48-5
Original file line numberDiff line numberDiff line change
@@ -433,8 +433,46 @@ impl TableUpdate {
433433
/// Applies the update to the table metadata builder.
434434
pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
435435
match self {
436-
TableUpdate::AssignUuid { uuid } => builder.assign_uuid(uuid),
437-
_ => unimplemented!(),
436+
TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
437+
TableUpdate::AddSchema {
438+
schema,
439+
last_column_id,
440+
} => {
441+
if let Some(last_column_id) = last_column_id {
442+
if builder.last_column_id() < last_column_id {
443+
return Err(Error::new(
444+
ErrorKind::DataInvalid,
445+
format!(
446+
"Invalid last column ID: {last_column_id} < {} (previous last column ID)",
447+
builder.last_column_id()
448+
),
449+
));
450+
}
451+
};
452+
builder.add_schema(schema)
453+
}
454+
TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
455+
TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
456+
TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
457+
TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
458+
TableUpdate::SetDefaultSortOrder { sort_order_id } => {
459+
builder.set_default_sort_order(sort_order_id)
460+
}
461+
TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
462+
TableUpdate::SetSnapshotRef {
463+
ref_name,
464+
reference,
465+
} => builder.set_ref(&ref_name, reference),
466+
TableUpdate::RemoveSnapshots { snapshot_ids } => {
467+
Ok(builder.remove_snapshots(&snapshot_ids))
468+
}
469+
TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
470+
TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
471+
TableUpdate::SetProperties { updates } => builder.set_properties(updates),
472+
TableUpdate::RemoveProperties { removals } => Ok(builder.remove_properties(&removals)),
473+
TableUpdate::UpgradeFormatVersion { format_version } => {
474+
builder.upgrade_format_version(format_version)
475+
}
438476
}
439477
}
440478
}
@@ -1102,16 +1140,21 @@ mod tests {
11021140
let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
11031141
.unwrap()
11041142
.build()
1105-
.unwrap();
1106-
let table_metadata_builder = TableMetadataBuilder::new(table_metadata);
1143+
.unwrap()
1144+
.metadata;
1145+
let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
1146+
table_metadata,
1147+
"s3://db/table/metadata/metadata1.gz.json",
1148+
);
11071149

11081150
let uuid = uuid::Uuid::new_v4();
11091151
let update = TableUpdate::AssignUuid { uuid };
11101152
let updated_metadata = update
11111153
.apply(table_metadata_builder)
11121154
.unwrap()
11131155
.build()
1114-
.unwrap();
1156+
.unwrap()
1157+
.metadata;
11151158
assert_eq!(updated_metadata.uuid(), uuid);
11161159
}
11171160
}

crates/iceberg/src/spec/partition.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ impl PartitionSpec {
150150

151151
/// Check if this partition spec has sequential partition ids.
152152
/// Sequential ids start from 1000 and increment by 1 for each field.
153-
/// This is required for spec version
153+
/// This is required for spec version 1
154154
pub fn has_sequential_ids(&self) -> bool {
155155
for (index, field) in self.fields.iter().enumerate() {
156156
let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)

crates/iceberg/src/spec/schema.rs

+8-55
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::{ensure_data_valid, Error, ErrorKind};
3939
pub type SchemaId = i32;
4040
/// Reference to [`Schema`].
4141
pub type SchemaRef = Arc<Schema>;
42-
const DEFAULT_SCHEMA_ID: SchemaId = 0;
42+
pub(crate) const DEFAULT_SCHEMA_ID: SchemaId = 0;
4343

4444
/// Defines schema in iceberg.
4545
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -133,7 +133,7 @@ impl SchemaBuilder {
133133
let r#struct = StructType::new(self.fields);
134134
let id_to_field = index_by_id(&r#struct)?;
135135
let highest_field_id =
136-
highest_field_id.unwrap_or(get_highest_schema_id_for_struct(&r#struct));
136+
highest_field_id.unwrap_or(id_to_field.keys().max().cloned().unwrap_or(0));
137137

138138
Self::validate_identifier_ids(
139139
&r#struct,
@@ -367,13 +367,18 @@ impl Schema {
367367
self.id_to_name.get(&field_id).map(String::as_str)
368368
}
369369

370+
/// Return A HashMap matching field ids to field names.
371+
pub(crate) fn field_id_to_name_map(&self) -> &HashMap<i32, String> {
372+
&self.id_to_name
373+
}
374+
370375
/// Get an accessor for retrieving data in a struct
371376
pub fn accessor_by_field_id(&self, field_id: i32) -> Option<Arc<StructAccessor>> {
372377
self.field_id_to_accessor.get(&field_id).cloned()
373378
}
374379

375380
/// Check if this schema is identical to another schema semantically - excluding schema id.
376-
pub fn is_same_schema(&self, other: &SchemaRef) -> bool {
381+
pub(crate) fn is_same_schema(&self, other: &SchemaRef) -> bool {
377382
self.as_struct().eq(other.as_struct())
378383
&& self.identifier_field_ids().eq(other.identifier_field_ids())
379384
}
@@ -981,58 +986,6 @@ impl SchemaVisitor for PruneColumn {
981986
}
982987
}
983988

984-
/// Get the highest field id in a struct.
985-
/// Recursively visits nested fields.
986-
pub fn get_highest_schema_id_for_struct(s: &StructType) -> i32 {
987-
let mut agg = HighestFieldIdAggregator::new();
988-
visit_struct(s, &mut agg).unwrap_or_default();
989-
agg.highest_field_id
990-
}
991-
992-
struct HighestFieldIdAggregator {
993-
highest_field_id: i32,
994-
}
995-
996-
impl HighestFieldIdAggregator {
997-
fn new() -> Self {
998-
Self {
999-
highest_field_id: 0,
1000-
}
1001-
}
1002-
}
1003-
1004-
impl SchemaVisitor for HighestFieldIdAggregator {
1005-
type T = ();
1006-
1007-
fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T> {
1008-
Ok(())
1009-
}
1010-
1011-
fn field(&mut self, field: &NestedFieldRef, _value: Self::T) -> Result<Self::T> {
1012-
self.highest_field_id = std::cmp::max(self.highest_field_id, field.id);
1013-
Ok(())
1014-
}
1015-
1016-
fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) -> Result<Self::T> {
1017-
Ok(())
1018-
}
1019-
1020-
fn list(&mut self, list: &ListType, _value: Self::T) -> Result<Self::T> {
1021-
self.highest_field_id = std::cmp::max(self.highest_field_id, list.element_field.id);
1022-
Ok(())
1023-
}
1024-
1025-
fn map(&mut self, map: &MapType, _key_value: Self::T, _value: Self::T) -> Result<Self::T> {
1026-
self.highest_field_id = std::cmp::max(self.highest_field_id, map.key_field.id);
1027-
self.highest_field_id = std::cmp::max(self.highest_field_id, map.value_field.id);
1028-
Ok(())
1029-
}
1030-
1031-
fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
1032-
Ok(())
1033-
}
1034-
}
1035-
1036989
struct ReassignFieldIds {
1037990
next_field_id: i32,
1038991
old_to_new_id: HashMap<i32, i32>,

crates/iceberg/src/spec/snapshot.rs

+1-9
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@ use std::collections::HashMap;
2222
use std::sync::Arc;
2323

2424
use _serde::SnapshotV2;
25-
use chrono::{DateTime, TimeZone, Utc};
25+
use chrono::{DateTime, Utc};
2626
use serde::{Deserialize, Serialize};
2727
use typed_builder::TypedBuilder;
2828

29-
use super::table_metadata::SnapshotLog;
3029
use crate::error::{timestamp_ms_to_utc, Result};
3130
use crate::io::FileIO;
3231
use crate::spec::{ManifestList, SchemaId, SchemaRef, StructType, TableMetadata};
@@ -192,13 +191,6 @@ impl Snapshot {
192191
partition_type_provider,
193192
)
194193
}
195-
196-
pub(crate) fn log(&self) -> SnapshotLog {
197-
SnapshotLog {
198-
timestamp_ms: self.timestamp_ms,
199-
snapshot_id: self.snapshot_id,
200-
}
201-
}
202194
}
203195

204196
pub(super) mod _serde {

crates/iceberg/src/spec/sort.rs

+36-7
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ impl SortOrder {
138138
impl SortOrderBuilder {
139139
/// Creates a new unbound sort order.
140140
pub fn build_unbound(&self) -> Result<SortOrder> {
141-
let fields = self.fields.clone().unwrap_or_default();
141+
let fields: Vec<SortField> = self.fields.clone().unwrap_or_default();
142142
return match (self.order_id, fields.as_slice()) {
143143
(Some(SortOrder::UNSORTED_ORDER_ID) | None, []) => Ok(SortOrder::unsorted_order()),
144144
(_, []) => Err(Error::new(
@@ -160,13 +160,13 @@ impl SortOrderBuilder {
160160
}
161161

162162
/// Creates a new bound sort order.
163-
pub fn build(&self, schema: Schema) -> Result<SortOrder> {
163+
pub fn build(&self, schema: &Schema) -> Result<SortOrder> {
164164
let unbound_sort_order = self.build_unbound()?;
165165
SortOrderBuilder::check_compatibility(unbound_sort_order, schema)
166166
}
167167

168168
/// Returns the given sort order if it is compatible with the given schema
169-
fn check_compatibility(sort_order: SortOrder, schema: Schema) -> Result<SortOrder> {
169+
fn check_compatibility(sort_order: SortOrder, schema: &Schema) -> Result<SortOrder> {
170170
let sort_fields = &sort_order.fields;
171171
for sort_field in sort_fields {
172172
match schema.field_by_id(sort_field.source_id) {
@@ -290,6 +290,35 @@ mod tests {
290290
)
291291
}
292292

293+
#[test]
294+
fn test_build_unbound_returns_correct_default_order_id_for_no_fields() {
295+
assert_eq!(
296+
SortOrder::builder()
297+
.build_unbound()
298+
.expect("Expected an Ok value")
299+
.order_id,
300+
SortOrder::UNSORTED_ORDER_ID
301+
)
302+
}
303+
304+
#[test]
305+
fn test_build_unbound_returns_correct_default_order_id_for_fields() {
306+
let sort_field = SortField::builder()
307+
.source_id(2)
308+
.direction(SortDirection::Ascending)
309+
.null_order(NullOrder::First)
310+
.transform(Transform::Identity)
311+
.build();
312+
assert_ne!(
313+
SortOrder::builder()
314+
.with_sort_field(sort_field.clone())
315+
.build_unbound()
316+
.expect("Expected an Ok value")
317+
.order_id,
318+
SortOrder::UNSORTED_ORDER_ID
319+
)
320+
}
321+
293322
#[test]
294323
fn test_build_unbound_should_return_unsorted_sort_order() {
295324
assert_eq!(
@@ -367,7 +396,7 @@ mod tests {
367396
.transform(Transform::Identity)
368397
.build(),
369398
)
370-
.build(schema);
399+
.build(&schema);
371400

372401
assert_eq!(
373402
sort_order_builder_result
@@ -406,7 +435,7 @@ mod tests {
406435
.transform(Transform::Identity)
407436
.build(),
408437
)
409-
.build(schema);
438+
.build(&schema);
410439

411440
assert_eq!(
412441
sort_order_builder_result
@@ -438,7 +467,7 @@ mod tests {
438467
.transform(Transform::Year)
439468
.build(),
440469
)
441-
.build(schema);
470+
.build(&schema);
442471

443472
assert_eq!(
444473
sort_order_builder_result
@@ -468,7 +497,7 @@ mod tests {
468497

469498
let sort_order_builder_result = SortOrder::builder()
470499
.with_sort_field(sort_field.clone())
471-
.build(schema);
500+
.build(&schema);
472501

473502
assert_eq!(
474503
sort_order_builder_result.expect("Expected an Ok value"),

0 commit comments

Comments
 (0)