diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index 0b23ec748a0c..73f1b5a4a555 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -130,8 +130,13 @@ protected boolean supportsBatchReads() { @TempDir private File tableDir; + // Vortex is intentionally excluded from this TCK for now. Its appender writes through a native + // code path that targets a filesystem URI, so it cannot populate the in-memory OutputFile this + // suite writes to (every write-based case fails converting the unwritten file), and it does not + // yet register position-delete writers or support split-size and name-mapping reads. Re-add + // FileFormat.VORTEX once the appender supports arbitrary FileIO and those capabilities land. private static final FileFormat[] FILE_FORMATS = - new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC, FileFormat.VORTEX}; + new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC}; private static final List FORMAT_AND_GENERATOR = Arrays.stream(FILE_FORMATS) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java index ed231010b563..fbe837a3eb18 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java @@ -39,6 +39,7 @@ import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.StructVector; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.UUIDUtil; @@ -142,6 +143,25 @@ private static void writeValue( ((TimeStampNanoVector) vector).setSafe(rowIndex, row.getLong(fieldIndex)); } + break; + case STRUCT: + Types.StructType structType = (Types.StructType) type; + StructVector structVector = (StructVector) vector; + List structFields = structType.fields(); + InternalRow structRow = row.getStruct(fieldIndex, structFields.size()); + for (int i = 0; i < structFields.size(); i++) { + Types.NestedField structField = structFields.get(i); + // Bind each Iceberg child to the Arrow child of the same name; the Arrow struct is built + // from the write schema, so names line up even if ordinals were to drift. + FieldVector childVector = (FieldVector) structVector.getChild(structField.name()); + if (structRow.isNullAt(i)) { + childVector.setNull(rowIndex); + } else { + writeValue(childVector, structField.type(), structRow, i, rowIndex); + } + } + // Mark the struct slot itself as non-null for this row. + structVector.setIndexDefined(rowIndex); break; default: throw new UnsupportedOperationException( diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java index ed231010b563..fbe837a3eb18 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java @@ -39,6 +39,7 @@ import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.StructVector; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.UUIDUtil; @@ -142,6 +143,25 @@ private static void writeValue( ((TimeStampNanoVector) vector).setSafe(rowIndex, row.getLong(fieldIndex)); } + break; + case STRUCT: + Types.StructType structType = (Types.StructType) type; + StructVector structVector = (StructVector) vector; + List structFields = structType.fields(); + InternalRow structRow = row.getStruct(fieldIndex, structFields.size()); + for (int i = 0; i < structFields.size(); i++) { + Types.NestedField structField = structFields.get(i); + // Bind each Iceberg child to the Arrow child of the same name; the Arrow struct is built + // from the write schema, so names line up even if ordinals were to drift. + FieldVector childVector = (FieldVector) structVector.getChild(structField.name()); + if (structRow.isNullAt(i)) { + childVector.setNull(rowIndex); + } else { + writeValue(childVector, structField.type(), structRow, i, rowIndex); + } + } + // Mark the struct slot itself as non-null for this row. + structVector.setIndexDefined(rowIndex); break; default: throw new UnsupportedOperationException( diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java index ed231010b563..fbe837a3eb18 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java @@ -39,6 +39,7 @@ import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.StructVector; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.UUIDUtil; @@ -142,6 +143,25 @@ private static void writeValue( ((TimeStampNanoVector) vector).setSafe(rowIndex, row.getLong(fieldIndex)); } + break; + case STRUCT: + Types.StructType structType = (Types.StructType) type; + StructVector structVector = (StructVector) vector; + List structFields = structType.fields(); + InternalRow structRow = row.getStruct(fieldIndex, structFields.size()); + for (int i = 0; i < structFields.size(); i++) { + Types.NestedField structField = structFields.get(i); + // Bind each Iceberg child to the Arrow child of the same name; the Arrow struct is built + // from the write schema, so names line up even if ordinals were to drift. + FieldVector childVector = (FieldVector) structVector.getChild(structField.name()); + if (structRow.isNullAt(i)) { + childVector.setNull(rowIndex); + } else { + writeValue(childVector, structField.type(), structRow, i, rowIndex); + } + } + // Mark the struct slot itself as non-null for this row. + structVector.setIndexDefined(rowIndex); break; default: throw new UnsupportedOperationException( diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java index ba0f5d43c4ee..cab7ca64f792 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.arrow.vector.types.DateUnit; import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.TimeUnit; @@ -44,11 +45,18 @@ private VortexSchemas() {} /** Convert a Vortex file's Arrow {@link org.apache.arrow.vector.types.pojo.Schema} to Iceberg. */ public static Schema convert(org.apache.arrow.vector.types.pojo.Schema arrowSchema) { - List fields = arrowSchema.getFields(); + return new Schema(convertFields(arrowSchema.getFields(), new AtomicInteger(0))); + } + + // Arrow/Vortex schemas carry no Iceberg field ids, so ids are synthesized here. A single shared + // counter assigns each field (including nested struct fields and list elements) a unique id in + // pre-order, which is all Iceberg requires for a valid schema; binding/projection happens by + // name. + private static List convertFields(List fields, AtomicInteger nextId) { List columns = Lists.newArrayListWithExpectedSize(fields.size()); - for (int fieldId = 0; fieldId < fields.size(); fieldId++) { - Field field = fields.get(fieldId); - Type icebergType = toIcebergType(field); + for (Field field : fields) { + int fieldId = nextId.getAndIncrement(); + Type icebergType = toIcebergType(field, nextId); if (field.isNullable()) { columns.add(optional(fieldId, field.getName(), icebergType)); } else { @@ -56,18 +64,24 @@ public static Schema convert(org.apache.arrow.vector.types.pojo.Schema arrowSche } } - return new Schema(columns); + return columns; } /** Convert a Vortex file's relocated Arrow schema to Iceberg. */ public static Schema convert( dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Schema arrowSchema) { - List fields = - arrowSchema.getFields(); + return new Schema(convertVortexFields(arrowSchema.getFields(), new AtomicInteger(0))); + } + + // Counterpart of convertFields for relocated Vortex Arrow fields (see that method for details). A + // distinct name is required because both overloads would otherwise erase to convert(List, ...). + private static List convertVortexFields( + List fields, + AtomicInteger nextId) { List columns = Lists.newArrayListWithExpectedSize(fields.size()); - for (int fieldId = 0; fieldId < fields.size(); fieldId++) { - dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field = fields.get(fieldId); - Type icebergType = toIcebergType(field); + for (dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field : fields) { + int fieldId = nextId.getAndIncrement(); + Type icebergType = toIcebergType(field, nextId); if (field.isNullable()) { columns.add(optional(fieldId, field.getName(), icebergType)); } else { @@ -75,7 +89,7 @@ public static Schema convert( } } - return new Schema(columns); + return columns; } /** Convert an Iceberg Schema to an Arrow Schema suitable for local Arrow vectors. */ @@ -471,7 +485,7 @@ private static dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field toV children); } - private static Type toIcebergType(Field field) { + private static Type toIcebergType(Field field, AtomicInteger nextId) { // UUID is conveyed as the {@code arrow.uuid} extension over // FixedSizeBinary(16). Check metadata directly so this works whether or not // the extension is registered with ExtensionTypeRegistry. @@ -489,14 +503,18 @@ private static Type toIcebergType(Field field) { return Types.FixedType.ofLength(fixed.getByteWidth()); } else if (arrowType instanceof ArrowType.Timestamp tsType) { return toIcebergTimestamp(tsType); - } else if (arrowType instanceof ArrowType.List) { - return toIcebergList(field); + } else if (arrowType instanceof ArrowType.List + || arrowType instanceof ArrowType.LargeList + || arrowType instanceof ArrowType.FixedSizeList) { + return toIcebergList(field, nextId); + } else if (arrowType instanceof ArrowType.Struct) { + return Types.StructType.of(convertFields(field.getChildren(), nextId)); } return toIcebergSimpleType(arrowType); } private static Type toIcebergType( - dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field) { + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field, AtomicInteger nextId) { if (isUuidField(field)) { return Types.UUIDType.get(); } @@ -521,8 +539,16 @@ private static Type toIcebergType( dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Timestamp tsType) { return toIcebergTimestamp(tsType); } else if (arrowType - instanceof dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.List) { - return toIcebergList(field); + instanceof dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.List + || arrowType + instanceof dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.LargeList + || arrowType + instanceof + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.FixedSizeList) { + return toIcebergList(field, nextId); + } else if (arrowType + instanceof dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Struct) { + return Types.StructType.of(convertVortexFields(field.getChildren(), nextId)); } return toIcebergSimpleType(arrowType); } @@ -612,22 +638,24 @@ private static Type toIcebergTimestamp( return isNano ? Types.TimestampNanoType.withZone() : Types.TimestampType.withZone(); } - private static Type toIcebergList(Field field) { + private static Type toIcebergList(Field field, AtomicInteger nextId) { Field elementField = field.getChildren().get(0); - Type innerType = toIcebergType(elementField); + int elementId = nextId.getAndIncrement(); + Type innerType = toIcebergType(elementField, nextId); return elementField.isNullable() - ? Types.ListType.ofOptional(0, innerType) - : Types.ListType.ofRequired(0, innerType); + ? Types.ListType.ofOptional(elementId, innerType) + : Types.ListType.ofRequired(elementId, innerType); } private static Type toIcebergList( - dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field) { + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field, AtomicInteger nextId) { dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field elementField = field.getChildren().get(0); - Type innerType = toIcebergType(elementField); + int elementId = nextId.getAndIncrement(); + Type innerType = toIcebergType(elementField, nextId); return elementField.isNullable() - ? Types.ListType.ofOptional(0, innerType) - : Types.ListType.ofRequired(0, innerType); + ? Types.ListType.ofOptional(elementId, innerType) + : Types.ListType.ofRequired(elementId, innerType); } /** diff --git a/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java new file mode 100644 index 000000000000..9a1e5877ee76 --- /dev/null +++ b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.vortex; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestVortexSchemas { + // A struct nested inside a struct, plus a list, so the conversion exercises every recursive + // branch and id is forced to stay unique across siblings, nested structs, and list elements. + private static final Schema SCHEMA = + new Schema( + required(1, "id", Types.LongType.get()), + optional( + 2, + "location", + Types.StructType.of( + required(3, "lat", Types.DoubleType.get()), + required(4, "long", Types.DoubleType.get()))), + optional(5, "tags", Types.ListType.ofRequired(6, Types.StringType.get())), + optional( + 7, + "nested", + Types.StructType.of( + required( + 8, + "inner", + Types.StructType.of(required(9, "x", Types.IntegerType.get())))))); + + @Test + public void testConvertLocalArrowStructTypes() { + assertStructRoundTrip(VortexSchemas.convert(VortexSchemas.toArrowSchema(SCHEMA))); + } + + @Test + public void testConvertRelocatedArrowStructTypes() { + assertStructRoundTrip(VortexSchemas.convert(VortexSchemas.toVortexArrowSchema(SCHEMA))); + } + + @Test + public void testConvertLargeAndFixedSizeLists() { + // toArrowSchema only emits ArrowType.List for Iceberg lists, so build the Arrow schema directly + // to exercise the LargeList and FixedSizeList branches a real Vortex file can produce. + org.apache.arrow.vector.types.pojo.Schema arrowSchema = + new org.apache.arrow.vector.types.pojo.Schema( + List.of( + listField( + "big", ArrowType.LargeList.INSTANCE, new ArrowType.Int(Integer.SIZE, true)), + listField( + "vec", + new ArrowType.FixedSizeList(3), + new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)))); + + Schema converted = VortexSchemas.convert(arrowSchema); + + assertThat(converted.findField("big").type()).isInstanceOf(Types.ListType.class); + assertThat(converted.findField("big").isOptional()).isTrue(); + assertThat(converted.findType("big.element")).isEqualTo(Types.IntegerType.get()); + assertThat(converted.findField("vec").type()).isInstanceOf(Types.ListType.class); + assertThat(converted.findType("vec.element")).isEqualTo(Types.FloatType.get()); + assertThat(TypeUtil.indexById(converted.asStruct())).hasSize(4); + } + + private static Field listField(String name, ArrowType listType, ArrowType elementType) { + Field element = new Field("element", new FieldType(false, elementType, null), null); + return new Field(name, new FieldType(true, listType, null), List.of(element)); + } + + private static void assertStructRoundTrip(Schema roundTrip) { + // Names and types survive the round trip through Arrow (binding is by name). + assertThat(roundTrip.findField("location").type()).isInstanceOf(Types.StructType.class); + assertThat(roundTrip.findField("location").isOptional()).isTrue(); + assertThat(roundTrip.findType("location.lat")).isEqualTo(Types.DoubleType.get()); + assertThat(roundTrip.findField("location.lat").isRequired()).isTrue(); + assertThat(roundTrip.findType("nested.inner.x")).isEqualTo(Types.IntegerType.get()); + assertThat(roundTrip.findType("tags.element")).isEqualTo(Types.StringType.get()); + + // Every field (including nested struct fields and the list element) gets a unique synthesized + // id. indexById builds an id->field map and throws on a collision, so a successful index with + // one entry per field proves the ids are valid. + assertThat(TypeUtil.indexById(roundTrip.asStruct())).hasSize(9); + } +}