Skip to content

Commit 013362e

Browse files
manuzhangcodex
andcommitted
feat(spec): add unknown datatype support
Add Iceberg unknown type parsing and conversions across schema, Arrow, Avro, HMS, and Glue paths. Reject non-null defaults for unknown fields and make set predicate formatting deterministic for stable tests. Co-authored-by: Codex <codex@openai.com>
1 parent 5eb342f commit 013362e

11 files changed

Lines changed: 246 additions & 54 deletions

File tree

crates/catalog/glue/src/schema.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,12 @@ impl SchemaVisitor for GlueSchemaBuilder {
157157

158158
fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) -> iceberg::Result<Self::T> {
159159
let glue_type = match p {
160+
PrimitiveType::Unknown => {
161+
return Err(Error::new(
162+
ErrorKind::FeatureUnsupported,
163+
format!("Conversion from {p:?} is not supported"),
164+
));
165+
}
160166
PrimitiveType::Boolean => "boolean".to_string(),
161167
PrimitiveType::Int => "int".to_string(),
162168
PrimitiveType::Long => "bigint".to_string(),

crates/catalog/hms/src/schema.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ impl SchemaVisitor for HiveSchemaBuilder {
114114

115115
fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) -> iceberg::Result<String> {
116116
let hive_type = match p {
117+
PrimitiveType::Unknown => {
118+
return Err(Error::new(
119+
ErrorKind::FeatureUnsupported,
120+
format!("Conversion from {p:?} is not supported"),
121+
));
122+
}
117123
PrimitiveType::Boolean => "boolean".to_string(),
118124
PrimitiveType::Int => "int".to_string(),
119125
PrimitiveType::Long => "bigint".to_string(),

crates/iceberg/src/arrow/reader/projection.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ impl ArrowReader {
6161
/// Nested types (struct/list/map) are flattened in Parquet's columnar format.
6262
fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
6363
match field.field_type.as_ref() {
64+
Type::Primitive(PrimitiveType::Unknown) => {}
6465
Type::Primitive(_) => {
6566
field_ids.push(field.id);
6667
}
@@ -94,6 +95,7 @@ impl ArrowReader {
9495
(Some(lhs), Some(rhs)) if lhs == rhs => true,
9596
(Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
9697
(Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
98+
(Some(PrimitiveType::Unknown), Some(_)) => true,
9799
(
98100
Some(PrimitiveType::Decimal {
99101
precision: file_precision,

crates/iceberg/src/arrow/schema.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> Resu
120120
| DataType::Utf8
121121
| DataType::LargeUtf8
122122
| DataType::Utf8View
123+
| DataType::Null
123124
| DataType::Binary
124125
| DataType::LargeBinary
125126
| DataType::BinaryView
@@ -428,6 +429,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter {
428429

429430
fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
430431
match p {
432+
DataType::Null => Ok(Type::Primitive(PrimitiveType::Unknown)),
431433
DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)),
432434
DataType::Int8 | DataType::Int16 | DataType::Int32 => {
433435
Ok(Type::Primitive(PrimitiveType::Int))
@@ -613,6 +615,9 @@ impl SchemaVisitor for ToArrowSchemaConverter {
613615
p: &crate::spec::PrimitiveType,
614616
) -> crate::Result<ArrowSchemaOrFieldOrType> {
615617
match p {
618+
crate::spec::PrimitiveType::Unknown => {
619+
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Null))
620+
}
616621
crate::spec::PrimitiveType::Boolean => {
617622
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Boolean))
618623
}
@@ -1116,6 +1121,7 @@ pub fn datum_to_arrow_type_with_ree(datum: &Datum) -> DataType {
11161121

11171122
// Match on the PrimitiveType from the Datum to determine the Arrow type
11181123
match datum.data_type() {
1124+
PrimitiveType::Unknown => make_ree(DataType::Null),
11191125
PrimitiveType::Boolean => make_ree(DataType::Boolean),
11201126
PrimitiveType::Int => make_ree(DataType::Int32),
11211127
PrimitiveType::Long => make_ree(DataType::Int64),
@@ -1915,6 +1921,13 @@ mod tests {
19151921
assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
19161922
}
19171923

1924+
{
1925+
let arrow_type = DataType::Null;
1926+
let iceberg_type = Type::Primitive(PrimitiveType::Unknown);
1927+
assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
1928+
assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
1929+
}
1930+
19181931
// test struct type
19191932
{
19201933
// no metadata will cause error

crates/iceberg/src/arrow/value.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
206206

207207
fn primitive(&mut self, p: &PrimitiveType, partner: &ArrayRef) -> Result<Vec<Option<Literal>>> {
208208
match p {
209+
PrimitiveType::Unknown => Ok(vec![None; partner.len()]),
209210
PrimitiveType::Boolean => {
210211
let array = partner
211212
.as_any()
@@ -629,6 +630,7 @@ pub(crate) fn create_primitive_array_single_element(
629630
prim_lit: &Option<PrimitiveLiteral>,
630631
) -> Result<ArrayRef> {
631632
match (data_type, prim_lit) {
633+
(DataType::Null, _) => Ok(Arc::new(arrow_array::NullArray::new(1))),
632634
(DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => {
633635
Ok(Arc::new(BooleanArray::from(vec![*v])))
634636
}

crates/iceberg/src/avro/schema.rs

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl SchemaVisitor for SchemaToAvroSchema {
7474
record.name = Name::from(format!("r{}", field.id).as_str());
7575
}
7676

77-
if !field.required {
77+
if !field.required && !matches!(field_schema, AvroSchema::Null) {
7878
field_schema = avro_optional(field_schema)?;
7979
}
8080

@@ -126,7 +126,7 @@ impl SchemaVisitor for SchemaToAvroSchema {
126126
record.name = Name::from(format!("r{}", list.element_field.id).as_str());
127127
}
128128

129-
if !list.element_field.required {
129+
if !list.element_field.required && !matches!(field_schema, AvroSchema::Null) {
130130
field_schema = avro_optional(field_schema)?;
131131
}
132132

@@ -147,7 +147,7 @@ impl SchemaVisitor for SchemaToAvroSchema {
147147
) -> Result<AvroSchemaOrField> {
148148
let key_field_schema = key_value.unwrap_left();
149149
let mut value_field_schema = value.unwrap_left();
150-
if !map.value_field.required {
150+
if !map.value_field.required && !matches!(value_field_schema, AvroSchema::Null) {
151151
value_field_schema = avro_optional(value_field_schema)?;
152152
}
153153

@@ -222,6 +222,7 @@ impl SchemaVisitor for SchemaToAvroSchema {
222222

223223
fn primitive(&mut self, p: &PrimitiveType) -> Result<AvroSchemaOrField> {
224224
let avro_schema = match p {
225+
PrimitiveType::Unknown => AvroSchema::Null,
225226
PrimitiveType::Boolean => AvroSchema::Boolean,
226227
PrimitiveType::Int => AvroSchema::Int,
227228
PrimitiveType::Long => AvroSchema::Long,
@@ -304,6 +305,10 @@ pub(crate) fn avro_decimal_schema(precision: usize, scale: usize) -> Result<Avro
304305
}
305306

306307
fn avro_optional(avro_schema: AvroSchema) -> Result<AvroSchema> {
308+
if matches!(avro_schema, AvroSchema::Null) {
309+
return Ok(AvroSchema::Null);
310+
}
311+
307312
Ok(AvroSchema::Union(UnionSchema::new(vec![
308313
AvroSchema::Null,
309314
avro_schema,
@@ -440,10 +445,11 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {
440445
let field_id =
441446
Self::get_element_id_from_attributes(&avro_field.custom_attributes, FIELD_ID_PROP)?;
442447

443-
let optional = is_avro_optional(&avro_field.schema);
448+
let optional = is_avro_optional(&avro_field.schema)
449+
|| matches!(&avro_field.schema, AvroSchema::Null);
444450

445-
let mut field =
446-
NestedField::new(field_id, &avro_field.name, field_type.unwrap(), !optional);
451+
let field_type = field_type.unwrap_or(Type::Primitive(PrimitiveType::Unknown));
452+
let mut field = NestedField::new(field_id, &avro_field.name, field_type, !optional);
447453

448454
if let Some(doc) = &avro_field.doc {
449455
field = field.with_doc(doc);
@@ -475,18 +481,21 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {
475481
}
476482

477483
if options.len() == 1 {
478-
Ok(Some(options.remove(0).unwrap()))
484+
Ok(options
485+
.remove(0)
486+
.or(Some(Type::Primitive(PrimitiveType::Unknown))))
479487
} else {
480488
Ok(Some(options.remove(1).unwrap()))
481489
}
482490
}
483491

484492
fn array(&mut self, array: &ArraySchema, item: Option<Type>) -> Result<Self::T> {
485493
let element_field_id = Self::get_element_id_from_attributes(&array.attributes, ELEMENT_ID)?;
494+
let item = item.unwrap_or(Type::Primitive(PrimitiveType::Unknown));
486495
let element_field = NestedField::list_element(
487496
element_field_id,
488-
item.unwrap(),
489-
!is_avro_optional(&array.items),
497+
item,
498+
!is_avro_optional(&array.items) && !matches!(array.items.as_ref(), AvroSchema::Null),
490499
)
491500
.into();
492501
Ok(Some(Type::List(ListType { element_field })))
@@ -497,10 +506,11 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {
497506
let key_field =
498507
NestedField::map_key_element(key_field_id, Type::Primitive(PrimitiveType::String));
499508
let value_field_id = Self::get_element_id_from_attributes(&map.attributes, VALUE_ID)?;
509+
let value = value.unwrap_or(Type::Primitive(PrimitiveType::Unknown));
500510
let value_field = NestedField::map_value_element(
501511
value_field_id,
502-
value.unwrap(),
503-
!is_avro_optional(&map.types),
512+
value,
513+
!is_avro_optional(&map.types) && !matches!(map.types.as_ref(), AvroSchema::Null),
504514
);
505515
Ok(Some(Type::Map(MapType {
506516
key_field: key_field.into(),
@@ -550,12 +560,7 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {
550560
"Can't convert avro map schema, missing key schema.",
551561
)
552562
})?;
553-
let value = value.ok_or_else(|| {
554-
Error::new(
555-
ErrorKind::DataInvalid,
556-
"Can't convert avro map schema, missing value schema.",
557-
)
558-
})?;
563+
let value = value.unwrap_or(Type::Primitive(PrimitiveType::Unknown));
559564
let key_id = Self::get_element_id_from_attributes(
560565
&array.fields[0].custom_attributes,
561566
FIELD_ID_PROP,
@@ -568,7 +573,8 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {
568573
let value_field = NestedField::map_value_element(
569574
value_id,
570575
value,
571-
!is_avro_optional(&array.fields[1].schema),
576+
!is_avro_optional(&array.fields[1].schema)
577+
&& !matches!(&array.fields[1].schema, AvroSchema::Null),
572578
);
573579
Ok(Some(Type::Map(MapType {
574580
key_field: key_field.into(),
@@ -650,6 +656,25 @@ mod tests {
650656
assert_eq!(iceberg_schema, converted_avro_converted_iceberg_schema);
651657
}
652658

659+
#[test]
660+
fn test_unknown_type_schema_conversion() {
661+
let schema = Schema::builder()
662+
.with_fields(vec![
663+
NestedField::optional(1, "empty", PrimitiveType::Unknown.into()).into(),
664+
])
665+
.build()
666+
.unwrap();
667+
668+
let avro_schema = schema_to_avro_schema("table", &schema).unwrap();
669+
let AvroSchema::Record(record) = &avro_schema else {
670+
panic!("expected avro record schema");
671+
};
672+
assert!(matches!(record.fields[0].schema, AvroSchema::Null));
673+
assert_eq!(record.fields[0].default, Some(Value::Null));
674+
675+
assert_eq!(schema, avro_schema_to_schema(&avro_schema).unwrap());
676+
}
677+
653678
#[test]
654679
fn test_manifest_file_v1_schema() {
655680
let fields = vec![

crates/iceberg/src/expr/predicate.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ impl<T: Bind> Bind for SetExpression<T> {
310310

311311
impl<T: Display + Debug> Display for SetExpression<T> {
312312
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
313-
let mut literal_strs = self.literals.iter().map(|l| format!("{l}"));
313+
let mut literal_strs = self.literals.iter().map(ToString::to_string).sorted();
314314

315315
write!(f, "{} {} ({})", self.term, self.op, literal_strs.join(", "))
316316
}
@@ -1363,7 +1363,7 @@ mod tests {
13631363
let schema = table_schema_simple();
13641364
let expr = Reference::new("bar").is_in([Datum::int(10), Datum::int(20)]);
13651365
let bound_expr = expr.bind(schema, true).unwrap();
1366-
assert_eq!(&format!("{bound_expr}"), "bar IN (20, 10)");
1366+
assert_eq!(&format!("{bound_expr}"), "bar IN (10, 20)");
13671367
test_bound_predicate_serialize_diserialize(bound_expr);
13681368
}
13691369

@@ -1398,7 +1398,7 @@ mod tests {
13981398
let schema = table_schema_simple();
13991399
let expr = Reference::new("bar").is_not_in([Datum::int(10), Datum::int(20)]);
14001400
let bound_expr = expr.bind(schema, true).unwrap();
1401-
assert_eq!(&format!("{bound_expr}"), "bar NOT IN (20, 10)");
1401+
assert_eq!(&format!("{bound_expr}"), "bar NOT IN (10, 20)");
14021402
test_bound_predicate_serialize_diserialize(bound_expr);
14031403
}
14041404

@@ -1571,13 +1571,7 @@ mod tests {
15711571
let expected_bound = expected_predicate.bind(schema, true).unwrap();
15721572

15731573
assert_eq!(result, expected_bound);
1574-
// Note: HashSet order may vary, so we check that it contains the expected format
1575-
let result_str = format!("{result}");
1576-
assert!(
1577-
result_str.contains("bar NOT IN")
1578-
&& result_str.contains("10")
1579-
&& result_str.contains("20")
1580-
);
1574+
assert_eq!(&format!("{result}"), "bar NOT IN (10, 20)");
15811575
}
15821576

15831577
#[test]

crates/iceberg/src/expr/visitors/strict_projection.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ mod tests {
424424
.bind(schema.clone(), false)
425425
.unwrap();
426426
let result = strict_projection.strict_project(&predicate).unwrap();
427-
assert_eq!(result.to_string(), "((((pcol1 NOT IN (0, -1)) AND (pcol2 NOT IN (0, -1))) AND (pcol3 NOT IN (0, -1))) AND (pcol4 NOT IN (0, -1))) AND (pcol5 NOT IN (0, -1))".to_string());
427+
assert_eq!(result.to_string(), "((((pcol1 NOT IN (-1, 0)) AND (pcol2 NOT IN (-1, 0))) AND (pcol3 NOT IN (-1, 0))) AND (pcol4 NOT IN (-1, 0))) AND (pcol5 NOT IN (-1, 0))".to_string());
428428

429429
// test in
430430
let predicate =
@@ -658,7 +658,7 @@ mod tests {
658658
.bind(schema.clone(), false)
659659
.unwrap();
660660
let result = strict_projection.strict_project(&predicate).unwrap();
661-
assert_eq!(result.to_string(), "((((pcol1 NOT IN (575, 564)) AND (pcol2 NOT IN (575, 564))) AND (pcol3 NOT IN (575, 564))) AND (pcol4 NOT IN (575, 564))) AND (pcol5 NOT IN (575, 564))".to_string());
661+
assert_eq!(result.to_string(), "((((pcol1 NOT IN (564, 575)) AND (pcol2 NOT IN (564, 575))) AND (pcol3 NOT IN (564, 575))) AND (pcol4 NOT IN (564, 575))) AND (pcol5 NOT IN (564, 575))".to_string());
662662

663663
// test in
664664
let predicate = Reference::new("col1")
@@ -1114,7 +1114,7 @@ mod tests {
11141114
.bind(schema.clone(), false)
11151115
.unwrap();
11161116
let result = strict_projection.strict_project(&predicate).unwrap();
1117-
assert_eq!(result.to_string(), "((((pcol1 NOT IN (575, 564)) AND (pcol2 NOT IN (575, 564))) AND (pcol3 NOT IN (575, 564))) AND (pcol4 NOT IN (575, 564))) AND (pcol5 NOT IN (575, 564))".to_string());
1117+
assert_eq!(result.to_string(), "((((pcol1 NOT IN (564, 575)) AND (pcol2 NOT IN (564, 575))) AND (pcol3 NOT IN (564, 575))) AND (pcol4 NOT IN (564, 575))) AND (pcol5 NOT IN (564, 575))".to_string());
11181118

11191119
// test in
11201120
let predicate = Reference::new("col1")
@@ -1724,7 +1724,7 @@ mod tests {
17241724
.bind(schema.clone(), false)
17251725
.unwrap();
17261726
let result = strict_projection.strict_project(&predicate).unwrap();
1727-
assert_eq!(result.to_string(), "((((pcol1 NOT IN (1969-12-31, 1969-12-30)) AND (pcol2 NOT IN (1969-12-31, 1969-12-30))) AND (pcol3 NOT IN (1969-12-31, 1969-12-30))) AND (pcol4 NOT IN (1969-12-31, 1969-12-30))) AND (pcol5 NOT IN (1969-12-31, 1969-12-30))".to_string());
1727+
assert_eq!(result.to_string(), "((((pcol1 NOT IN (1969-12-30, 1969-12-31)) AND (pcol2 NOT IN (1969-12-30, 1969-12-31))) AND (pcol3 NOT IN (1969-12-30, 1969-12-31))) AND (pcol4 NOT IN (1969-12-30, 1969-12-31))) AND (pcol5 NOT IN (1969-12-30, 1969-12-31))".to_string());
17281728

17291729
// test in
17301730
let predicate = Reference::new("col1")
@@ -1952,7 +1952,7 @@ mod tests {
19521952
.bind(schema.clone(), false)
19531953
.unwrap();
19541954
let result = strict_projection.strict_project(&predicate).unwrap();
1955-
assert_eq!(result.to_string(), "((((pcol1 NOT IN (47, 46)) AND (pcol2 NOT IN (47, 46))) AND (pcol3 NOT IN (47, 46))) AND (pcol4 NOT IN (47, 46))) AND (pcol5 NOT IN (47, 46))".to_string());
1955+
assert_eq!(result.to_string(), "((((pcol1 NOT IN (46, 47)) AND (pcol2 NOT IN (46, 47))) AND (pcol3 NOT IN (46, 47))) AND (pcol4 NOT IN (46, 47))) AND (pcol5 NOT IN (46, 47))".to_string());
19561956

19571957
// test in
19581958
let predicate = Reference::new("col1")
@@ -2384,7 +2384,7 @@ mod tests {
23842384
.bind(schema.clone(), false)
23852385
.unwrap();
23862386
let result = strict_projection.strict_project(&predicate).unwrap();
2387-
assert_eq!(result.to_string(), "((((pcol1 NOT IN (47, 46)) AND (pcol2 NOT IN (47, 46))) AND (pcol3 NOT IN (47, 46))) AND (pcol4 NOT IN (47, 46))) AND (pcol5 NOT IN (47, 46))".to_string());
2387+
assert_eq!(result.to_string(), "((((pcol1 NOT IN (46, 47)) AND (pcol2 NOT IN (46, 47))) AND (pcol3 NOT IN (46, 47))) AND (pcol4 NOT IN (46, 47))) AND (pcol5 NOT IN (46, 47))".to_string());
23882388

23892389
// test in
23902390
let predicate = Reference::new("col1")
@@ -2593,7 +2593,7 @@ mod tests {
25932593
.bind(schema.clone(), false)
25942594
.unwrap();
25952595
let result = strict_projection.strict_project(&predicate).unwrap();
2596-
assert_eq!(result.to_string(), "(((((pcol1 NOT IN (8, 7, 6)) AND (pcol2 NOT IN (8, 7, 6))) AND (pcol3 NOT IN (6, 2))) AND (pcol4 NOT IN (9, 4))) AND (pcol5 NOT IN (4, 6))) AND (pcol6 NOT IN (4, 6))".to_string());
2596+
assert_eq!(result.to_string(), "(((((pcol1 NOT IN (6, 7, 8)) AND (pcol2 NOT IN (6, 7, 8))) AND (pcol3 NOT IN (2, 6))) AND (pcol4 NOT IN (4, 9))) AND (pcol5 NOT IN (4, 6))) AND (pcol6 NOT IN (4, 6))".to_string());
25972597
}
25982598

25992599
#[tokio::test]
@@ -2690,15 +2690,15 @@ mod tests {
26902690
.bind(schema.clone(), false)
26912691
.unwrap();
26922692
let result = strict_projection.strict_project(&predicate).unwrap();
2693-
assert_eq!(result.to_string(), "pcol1 IN (101, 100)".to_string());
2693+
assert_eq!(result.to_string(), "pcol1 IN (100, 101)".to_string());
26942694

26952695
// test not in
26962696
let predicate = Reference::new("col1")
26972697
.is_not_in(vec![Datum::long(100), Datum::long(101)].into_iter())
26982698
.bind(schema.clone(), false)
26992699
.unwrap();
27002700
let result = strict_projection.strict_project(&predicate).unwrap();
2701-
assert_eq!(result.to_string(), "pcol1 NOT IN (101, 100)".to_string());
2701+
assert_eq!(result.to_string(), "pcol1 NOT IN (100, 101)".to_string());
27022702
}
27032703

27042704
#[tokio::test]
@@ -3103,7 +3103,7 @@ mod tests {
31033103
let result = strict_projection.strict_project(&predicate).unwrap();
31043104
assert_eq!(
31053105
result.to_string(),
3106-
"((pcol1 NOT IN (100, 90)) AND (pcol2 NOT IN (100, 90))) AND (pcol3 NOT IN (9890, 9990, 10090))"
3106+
"((pcol1 NOT IN (100, 90)) AND (pcol2 NOT IN (100, 90))) AND (pcol3 NOT IN (10090, 9890, 9990))"
31073107
.to_string()
31083108
);
31093109

0 commit comments

Comments
 (0)