Skip to content

Commit cb2d9e8

Browse files
manuzhangcodex
andcommitted
fix(spec): validate schema types by format version
Reject schema types that require a newer table format and update the DataFusion catalog registration path to request v3 only when the converted schema needs it. This keeps ordinary CREATE TABLE defaults on v2 while allowing timestamp_ns schemas to pass validation. Co-authored-by: Codex <codex@openai.com>
1 parent 9c70b97 commit cb2d9e8

3 files changed

Lines changed: 233 additions & 15 deletions

File tree

crates/iceberg/src/spec/table_metadata.rs

Lines changed: 142 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::cmp::Ordering;
2222
use std::collections::HashMap;
2323
use std::fmt::{Display, Formatter};
2424
use std::hash::Hash;
25-
use std::sync::Arc;
25+
use std::sync::{Arc, LazyLock};
2626

2727
use _serde::TableMetadataEnum;
2828
use chrono::{DateTime, Utc};
@@ -33,9 +33,9 @@ use uuid::Uuid;
3333
use super::snapshot::SnapshotReference;
3434
pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
3535
use super::{
36-
DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
37-
SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
38-
TableProperties, parse_metadata_file_compression,
36+
DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, PrimitiveType, Schema,
37+
SchemaId, SchemaRef, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile,
38+
StructType, TableProperties, parse_metadata_file_compression,
3939
};
4040
use crate::catalog::MetadataLocation;
4141
use crate::compression::CompressionCodec;
@@ -60,6 +60,14 @@ pub const MIN_FORMAT_VERSION_ROW_LINEAGE: FormatVersion = FormatVersion::V3;
6060
/// Reference to [`TableMetadata`].
6161
pub type TableMetadataRef = Arc<TableMetadata>;
6262

63+
static PRIMITIVE_TYPE_MIN_FORMAT_VERSION: LazyLock<HashMap<PrimitiveType, FormatVersion>> =
64+
LazyLock::new(|| {
65+
HashMap::from([
66+
(PrimitiveType::TimestampNs, FormatVersion::V3),
67+
(PrimitiveType::TimestamptzNs, FormatVersion::V3),
68+
])
69+
});
70+
6371
#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
6472
#[serde(try_from = "TableMetadataEnum")]
6573
/// Fields for the version 2 of the table metadata.
@@ -1565,6 +1573,25 @@ impl Display for FormatVersion {
15651573
}
15661574
}
15671575

1576+
/// Returns the minimum table format version required by any type in a schema.
1577+
///
1578+
/// Returns [`None`] when the schema contains no type with a specific minimum
1579+
/// table format version requirement.
1580+
pub fn min_format_version_for_schema(schema: &Schema) -> Option<FormatVersion> {
1581+
schema
1582+
.field_id_to_fields()
1583+
.values()
1584+
.filter_map(|field| field.field_type.as_primitive_type())
1585+
.filter_map(min_format_version_for_primitive_type)
1586+
.max()
1587+
}
1588+
1589+
pub(crate) fn min_format_version_for_primitive_type(
1590+
primitive: &PrimitiveType,
1591+
) -> Option<FormatVersion> {
1592+
PRIMITIVE_TYPE_MIN_FORMAT_VERSION.get(primitive).copied()
1593+
}
1594+
15681595
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
15691596
#[serde(rename_all = "kebab-case")]
15701597
/// Encodes changes to the previous metadata files for the table
@@ -1616,10 +1643,10 @@ mod tests {
16161643
use crate::io::FileIO;
16171644
use crate::spec::table_metadata::TableMetadata;
16181645
use crate::spec::{
1619-
BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation,
1620-
PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot,
1621-
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
1622-
Summary, TableProperties, Transform, Type, UnboundPartitionField,
1646+
BlobMetadata, EncryptedKey, INITIAL_ROW_ID, ListType, Literal, NestedField, NullOrder,
1647+
Operation, PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema,
1648+
Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder,
1649+
StatisticsFile, Summary, TableProperties, Transform, Type, UnboundPartitionField,
16231650
};
16241651
use crate::{ErrorKind, TableCreation};
16251652

@@ -3541,6 +3568,28 @@ mod tests {
35413568
)
35423569
}
35433570

3571+
fn schema_with_primitive_field(field_type: PrimitiveType) -> Schema {
3572+
Schema::builder()
3573+
.with_fields(vec![
3574+
NestedField::required(1, "ts", Type::Primitive(field_type)).into(),
3575+
])
3576+
.build()
3577+
.unwrap()
3578+
}
3579+
3580+
fn table_creation_with_format_version(
3581+
schema: Schema,
3582+
format_version: FormatVersion,
3583+
) -> TableCreation {
3584+
TableCreation::builder()
3585+
.location("s3://db/table".to_string())
3586+
.name("table".to_string())
3587+
.properties(HashMap::new())
3588+
.schema(schema)
3589+
.format_version(format_version)
3590+
.build()
3591+
}
3592+
35443593
#[test]
35453594
fn test_table_metadata_builder_from_table_creation() {
35463595
let table_creation = TableCreation::builder()
@@ -3591,6 +3640,91 @@ mod tests {
35913640
);
35923641
}
35933642

3643+
#[test]
3644+
fn test_table_metadata_builder_rejects_v1_v2_nanosecond_timestamp_tables() {
3645+
for (format_version, primitive_type) in [
3646+
(FormatVersion::V1, PrimitiveType::TimestampNs),
3647+
(FormatVersion::V1, PrimitiveType::TimestamptzNs),
3648+
(FormatVersion::V2, PrimitiveType::TimestampNs),
3649+
(FormatVersion::V2, PrimitiveType::TimestamptzNs),
3650+
] {
3651+
let table_creation = table_creation_with_format_version(
3652+
schema_with_primitive_field(primitive_type),
3653+
format_version,
3654+
);
3655+
3656+
let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err();
3657+
3658+
assert_eq!(err.kind(), ErrorKind::DataInvalid);
3659+
assert!(
3660+
err.message().contains("Invalid type for ts:"),
3661+
"expected error message to name the invalid column, got {}",
3662+
err.message()
3663+
);
3664+
assert!(
3665+
err.message().contains("is not supported until v3"),
3666+
"expected error message to explain v3 requirement, got {}",
3667+
err.message()
3668+
);
3669+
}
3670+
}
3671+
3672+
#[test]
3673+
fn test_table_metadata_builder_rejects_v2_list_element_requiring_v3() {
3674+
let schema = Schema::builder()
3675+
.with_fields(vec![
3676+
NestedField::required(
3677+
1,
3678+
"ts_values",
3679+
Type::List(ListType::new(
3680+
NestedField::list_element(
3681+
2,
3682+
Type::Primitive(PrimitiveType::TimestampNs),
3683+
false,
3684+
)
3685+
.into(),
3686+
)),
3687+
)
3688+
.into(),
3689+
])
3690+
.build()
3691+
.unwrap();
3692+
let table_creation = table_creation_with_format_version(schema, FormatVersion::V2);
3693+
3694+
let err = TableMetadataBuilder::from_table_creation(table_creation).unwrap_err();
3695+
3696+
assert_eq!(err.kind(), ErrorKind::DataInvalid);
3697+
assert!(
3698+
err.message().contains(
3699+
"Invalid type for ts_values.element: timestamp_ns is not supported until v3"
3700+
),
3701+
"expected error message to explain nested v3 requirement with column name, got {}",
3702+
err.message()
3703+
);
3704+
}
3705+
3706+
#[test]
3707+
fn test_table_metadata_builder_allows_v3_nanosecond_timestamp_tables() {
3708+
let schema = Schema::builder()
3709+
.with_fields(vec![
3710+
NestedField::required(1, "ts_ns", Type::Primitive(PrimitiveType::TimestampNs))
3711+
.into(),
3712+
NestedField::required(2, "tstz_ns", Type::Primitive(PrimitiveType::TimestamptzNs))
3713+
.into(),
3714+
])
3715+
.build()
3716+
.unwrap();
3717+
let table_creation = table_creation_with_format_version(schema, FormatVersion::V3);
3718+
3719+
let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
3720+
.unwrap()
3721+
.build()
3722+
.unwrap()
3723+
.metadata;
3724+
3725+
assert_eq!(table_metadata.format_version, FormatVersion::V3);
3726+
}
3727+
35943728
#[tokio::test]
35953729
async fn test_table_metadata_read_write() {
35963730
// Create a temporary directory for our test

crates/iceberg/src/spec/table_metadata_builder.rs

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::collections::{HashMap, HashSet};
18+
use std::collections::{BTreeMap, HashMap, HashSet};
1919
use std::sync::Arc;
2020

2121
use uuid::Uuid;
@@ -28,7 +28,10 @@ use super::{
2828
UnboundPartitionSpec,
2929
};
3030
use crate::error::{Error, ErrorKind, Result};
31-
use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE};
31+
use crate::spec::{
32+
EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE,
33+
min_format_version_for_primitive_type,
34+
};
3235
use crate::{TableCreation, TableUpdate};
3336

3437
pub(crate) const FIRST_FIELD_ID: i32 = 1;
@@ -196,6 +199,41 @@ impl TableMetadataBuilder {
196199
)
197200
}
198201

202+
fn validate_schema_compatible_with_format_version(
203+
format_version: FormatVersion,
204+
schema: &Schema,
205+
) -> Result<()> {
206+
let problems = schema
207+
.field_id_to_fields()
208+
.values()
209+
.filter_map(|field| {
210+
let field_type = field.field_type.as_ref();
211+
let primitive = field_type.as_primitive_type()?;
212+
let min_format_version = min_format_version_for_primitive_type(primitive)?;
213+
(format_version < min_format_version).then(|| {
214+
let column_name = schema
215+
.name_by_field_id(field.id)
216+
.unwrap_or(field.name.as_str());
217+
(
218+
field.id,
219+
format!(
220+
"Invalid type for {column_name}: {field_type} is not supported until {min_format_version}"
221+
),
222+
)
223+
})
224+
})
225+
.collect::<BTreeMap<_, _>>();
226+
227+
if !problems.is_empty() {
228+
return Err(Error::new(
229+
ErrorKind::DataInvalid,
230+
problems.into_values().collect::<Vec<_>>().join("; "),
231+
));
232+
}
233+
234+
Ok(())
235+
}
236+
199237
/// Changes uuid of table metadata.
200238
pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
201239
if self.metadata.table_uuid != uuid {
@@ -638,6 +676,11 @@ impl TableMetadataBuilder {
638676
/// Important: Use this method with caution. The builder does not check
639677
/// if the added schema is compatible with the current schema.
640678
pub fn add_schema(mut self, schema: Schema) -> Result<Self> {
679+
Self::validate_schema_compatible_with_format_version(
680+
self.metadata.format_version,
681+
&schema,
682+
)?;
683+
641684
// Validate that new schema fields don't conflict with existing partition field names
642685
self.validate_schema_field_names(&schema)?;
643686

crates/integrations/datafusion/src/schema.rs

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use futures::StreamExt;
2929
use futures::future::try_join_all;
3030
use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids;
3131
use iceberg::inspect::MetadataTableType;
32+
use iceberg::spec::min_format_version_for_schema;
3233
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent};
3334

3435
use crate::table::IcebergTableProvider;
@@ -162,12 +163,20 @@ impl SchemaProvider for IcebergSchemaProvider {
162163
let df_schema = table.schema();
163164
let iceberg_schema = arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref())
164165
.map_err(to_datafusion_error)?;
166+
let format_version = min_format_version_for_schema(&iceberg_schema);
165167

166168
// Create the table in the Iceberg catalog
167-
let table_creation = TableCreation::builder()
168-
.name(name.clone())
169-
.schema(iceberg_schema)
170-
.build();
169+
let table_creation = match format_version {
170+
Some(format_version) => TableCreation::builder()
171+
.name(name.clone())
172+
.format_version(format_version)
173+
.schema(iceberg_schema)
174+
.build(),
175+
None => TableCreation::builder()
176+
.name(name.clone())
177+
.schema(iceberg_schema)
178+
.build(),
179+
};
171180

172181
let catalog = self.catalog.clone();
173182
let namespace = self.namespace.clone();
@@ -288,10 +297,11 @@ mod tests {
288297
use std::sync::Arc;
289298

290299
use datafusion::arrow::array::{Int32Array, StringArray};
291-
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
300+
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
292301
use datafusion::arrow::record_batch::RecordBatch;
293302
use datafusion::datasource::MemTable;
294303
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
304+
use iceberg::spec::FormatVersion;
295305
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent};
296306
use tempfile::TempDir;
297307

@@ -375,6 +385,37 @@ mod tests {
375385
assert!(schema_provider.table_exist("empty_table"));
376386
}
377387

388+
#[tokio::test]
389+
async fn test_register_timestamp_ns_table_uses_v3() {
390+
let (schema_provider, _temp_dir) = create_test_schema_provider().await;
391+
392+
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
393+
"ts",
394+
DataType::Timestamp(TimeUnit::Nanosecond, None),
395+
true,
396+
)]));
397+
398+
let empty_batch = RecordBatch::new_empty(arrow_schema.clone());
399+
let mem_table = MemTable::try_new(arrow_schema, vec![vec![empty_batch]]).unwrap();
400+
401+
let result =
402+
schema_provider.register_table("timestamp_ns_table".to_string(), Arc::new(mem_table));
403+
404+
assert!(result.is_ok(), "Expected success, got: {result:?}");
405+
406+
let table_ident = TableIdent::new(
407+
schema_provider.namespace.clone(),
408+
"timestamp_ns_table".to_string(),
409+
);
410+
let table = schema_provider
411+
.catalog
412+
.load_table(&table_ident)
413+
.await
414+
.unwrap();
415+
416+
assert_eq!(FormatVersion::V3, table.metadata().format_version());
417+
}
418+
378419
#[tokio::test]
379420
async fn test_register_duplicate_table_fails() {
380421
let (schema_provider, _temp_dir) = create_test_schema_provider().await;

0 commit comments

Comments
 (0)