Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,13 @@ protected boolean supportsBatchReads() {

@TempDir private File tableDir;

// Vortex is intentionally excluded from this TCK for now. Its appender writes through a native
// code path that targets a filesystem URI, so it cannot populate the in-memory OutputFile this
// suite writes to (every write-based case fails converting the unwritten file), and it does not
// yet register position-delete writers or support split-size and name-mapping reads. Re-add
// FileFormat.VORTEX once the appender supports arbitrary FileIO and those capabilities land.
private static final FileFormat[] FILE_FORMATS =
new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC, FileFormat.VORTEX};
new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC};

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to implement some way of integrating with iceberg io since right now all test are failing since we don't write to the correct output


private static final List<Arguments> FORMAT_AND_GENERATOR =
Arrays.stream(FILE_FORMATS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.UUIDUtil;
Expand Down Expand Up @@ -142,6 +143,25 @@ private static void writeValue(
((TimeStampNanoVector) vector).setSafe(rowIndex, row.getLong(fieldIndex));
}

break;
case STRUCT:
Types.StructType structType = (Types.StructType) type;
StructVector structVector = (StructVector) vector;
List<Types.NestedField> structFields = structType.fields();
InternalRow structRow = row.getStruct(fieldIndex, structFields.size());
for (int i = 0; i < structFields.size(); i++) {
Types.NestedField structField = structFields.get(i);
// Bind each Iceberg child to the Arrow child of the same name; the Arrow struct is built
// from the write schema, so names line up even if ordinals were to drift.
FieldVector childVector = (FieldVector) structVector.getChild(structField.name());
if (structRow.isNullAt(i)) {
childVector.setNull(rowIndex);
} else {
writeValue(childVector, structField.type(), structRow, i, rowIndex);
}
}
// Mark the struct slot itself as non-null for this row.
structVector.setIndexDefined(rowIndex);
break;
default:
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.UUIDUtil;
Expand Down Expand Up @@ -142,6 +143,25 @@ private static void writeValue(
((TimeStampNanoVector) vector).setSafe(rowIndex, row.getLong(fieldIndex));
}

break;
case STRUCT:
Types.StructType structType = (Types.StructType) type;
StructVector structVector = (StructVector) vector;
List<Types.NestedField> structFields = structType.fields();
InternalRow structRow = row.getStruct(fieldIndex, structFields.size());
for (int i = 0; i < structFields.size(); i++) {
Types.NestedField structField = structFields.get(i);
// Bind each Iceberg child to the Arrow child of the same name; the Arrow struct is built
// from the write schema, so names line up even if ordinals were to drift.
FieldVector childVector = (FieldVector) structVector.getChild(structField.name());
if (structRow.isNullAt(i)) {
childVector.setNull(rowIndex);
} else {
writeValue(childVector, structField.type(), structRow, i, rowIndex);
}
}
// Mark the struct slot itself as non-null for this row.
structVector.setIndexDefined(rowIndex);
break;
default:
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.UUIDUtil;
Expand Down Expand Up @@ -142,6 +143,25 @@ private static void writeValue(
((TimeStampNanoVector) vector).setSafe(rowIndex, row.getLong(fieldIndex));
}

break;
case STRUCT:
Types.StructType structType = (Types.StructType) type;
StructVector structVector = (StructVector) vector;
List<Types.NestedField> structFields = structType.fields();
InternalRow structRow = row.getStruct(fieldIndex, structFields.size());
for (int i = 0; i < structFields.size(); i++) {
Types.NestedField structField = structFields.get(i);
// Bind each Iceberg child to the Arrow child of the same name; the Arrow struct is built
// from the write schema, so names line up even if ordinals were to drift.
FieldVector childVector = (FieldVector) structVector.getChild(structField.name());
if (structRow.isNullAt(i)) {
childVector.setNull(rowIndex);
} else {
writeValue(childVector, structField.type(), structRow, i, rowIndex);
}
}
// Mark the struct slot itself as non-null for this row.
structVector.setIndexDefined(rowIndex);
break;
default:
throw new UnsupportedOperationException(
Expand Down
78 changes: 53 additions & 25 deletions vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
Expand All @@ -44,38 +45,51 @@ private VortexSchemas() {}

/** Convert a Vortex file's Arrow {@link org.apache.arrow.vector.types.pojo.Schema} to Iceberg. */
public static Schema convert(org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
List<Field> fields = arrowSchema.getFields();
return new Schema(convertFields(arrowSchema.getFields(), new AtomicInteger(0)));
}

// Arrow/Vortex schemas carry no Iceberg field ids, so ids are synthesized here. A single shared
// counter assigns each field (including nested struct fields and list elements) a unique id in
// pre-order, which is all Iceberg requires for a valid schema; binding/projection happens by
// name.
private static List<Types.NestedField> convertFields(List<Field> fields, AtomicInteger nextId) {
List<Types.NestedField> columns = Lists.newArrayListWithExpectedSize(fields.size());
for (int fieldId = 0; fieldId < fields.size(); fieldId++) {
Field field = fields.get(fieldId);
Type icebergType = toIcebergType(field);
for (Field field : fields) {
int fieldId = nextId.getAndIncrement();
Type icebergType = toIcebergType(field, nextId);
if (field.isNullable()) {
columns.add(optional(fieldId, field.getName(), icebergType));
} else {
columns.add(required(fieldId, field.getName(), icebergType));
}
}

return new Schema(columns);
return columns;
}

/** Convert a Vortex file's relocated Arrow schema to Iceberg. */
public static Schema convert(
dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
List<dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field> fields =
arrowSchema.getFields();
return new Schema(convertVortexFields(arrowSchema.getFields(), new AtomicInteger(0)));
}

// Counterpart of convertFields for relocated Vortex Arrow fields (see that method for details). A
// distinct name is required because both overloads would otherwise erase to convert(List, ...).
private static List<Types.NestedField> convertVortexFields(
List<dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field> fields,
AtomicInteger nextId) {
List<Types.NestedField> columns = Lists.newArrayListWithExpectedSize(fields.size());
for (int fieldId = 0; fieldId < fields.size(); fieldId++) {
dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field = fields.get(fieldId);
Type icebergType = toIcebergType(field);
for (dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field : fields) {
int fieldId = nextId.getAndIncrement();
Type icebergType = toIcebergType(field, nextId);
if (field.isNullable()) {
columns.add(optional(fieldId, field.getName(), icebergType));
} else {
columns.add(required(fieldId, field.getName(), icebergType));
}
}

return new Schema(columns);
return columns;
}

/** Convert an Iceberg Schema to an Arrow Schema suitable for local Arrow vectors. */
Expand Down Expand Up @@ -471,7 +485,7 @@ private static dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field toV
children);
}

private static Type toIcebergType(Field field) {
private static Type toIcebergType(Field field, AtomicInteger nextId) {
// UUID is conveyed as the {@code arrow.uuid} extension over
// FixedSizeBinary(16). Check metadata directly so this works whether or not
// the extension is registered with ExtensionTypeRegistry.
Expand All @@ -489,14 +503,18 @@ private static Type toIcebergType(Field field) {
return Types.FixedType.ofLength(fixed.getByteWidth());
} else if (arrowType instanceof ArrowType.Timestamp tsType) {
return toIcebergTimestamp(tsType);
} else if (arrowType instanceof ArrowType.List) {
return toIcebergList(field);
} else if (arrowType instanceof ArrowType.List
|| arrowType instanceof ArrowType.LargeList
|| arrowType instanceof ArrowType.FixedSizeList) {
return toIcebergList(field, nextId);
} else if (arrowType instanceof ArrowType.Struct) {
return Types.StructType.of(convertFields(field.getChildren(), nextId));
}
return toIcebergSimpleType(arrowType);
}

private static Type toIcebergType(
dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field) {
dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field, AtomicInteger nextId) {
if (isUuidField(field)) {
return Types.UUIDType.get();
}
Expand All @@ -521,8 +539,16 @@ private static Type toIcebergType(
dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Timestamp tsType) {
return toIcebergTimestamp(tsType);
} else if (arrowType
instanceof dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.List) {
return toIcebergList(field);
instanceof dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.List
|| arrowType
instanceof dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.LargeList
|| arrowType
instanceof
dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.FixedSizeList) {
return toIcebergList(field, nextId);
} else if (arrowType
instanceof dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Struct) {
return Types.StructType.of(convertVortexFields(field.getChildren(), nextId));
}
return toIcebergSimpleType(arrowType);
}
Expand Down Expand Up @@ -612,22 +638,24 @@ private static Type toIcebergTimestamp(
return isNano ? Types.TimestampNanoType.withZone() : Types.TimestampType.withZone();
}

private static Type toIcebergList(Field field) {
private static Type toIcebergList(Field field, AtomicInteger nextId) {
Field elementField = field.getChildren().get(0);
Type innerType = toIcebergType(elementField);
int elementId = nextId.getAndIncrement();
Type innerType = toIcebergType(elementField, nextId);
return elementField.isNullable()
? Types.ListType.ofOptional(0, innerType)
: Types.ListType.ofRequired(0, innerType);
? Types.ListType.ofOptional(elementId, innerType)
: Types.ListType.ofRequired(elementId, innerType);
}

private static Type toIcebergList(
dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field) {
dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field, AtomicInteger nextId) {
dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field elementField =
field.getChildren().get(0);
Type innerType = toIcebergType(elementField);
int elementId = nextId.getAndIncrement();
Type innerType = toIcebergType(elementField, nextId);
return elementField.isNullable()
? Types.ListType.ofOptional(0, innerType)
: Types.ListType.ofRequired(0, innerType);
? Types.ListType.ofOptional(elementId, innerType)
: Types.ListType.ofRequired(elementId, innerType);
}

/**
Expand Down
110 changes: 110 additions & 0 deletions vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.vortex;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;

public class TestVortexSchemas {
// A struct nested inside a struct, plus a list, so the conversion exercises every recursive
// branch and id is forced to stay unique across siblings, nested structs, and list elements.
private static final Schema SCHEMA =
new Schema(
required(1, "id", Types.LongType.get()),
optional(
2,
"location",
Types.StructType.of(
required(3, "lat", Types.DoubleType.get()),
required(4, "long", Types.DoubleType.get()))),
optional(5, "tags", Types.ListType.ofRequired(6, Types.StringType.get())),
optional(
7,
"nested",
Types.StructType.of(
required(
8,
"inner",
Types.StructType.of(required(9, "x", Types.IntegerType.get()))))));

@Test
public void testConvertLocalArrowStructTypes() {
assertStructRoundTrip(VortexSchemas.convert(VortexSchemas.toArrowSchema(SCHEMA)));
}

@Test
public void testConvertRelocatedArrowStructTypes() {
assertStructRoundTrip(VortexSchemas.convert(VortexSchemas.toVortexArrowSchema(SCHEMA)));
}

@Test
public void testConvertLargeAndFixedSizeLists() {
// toArrowSchema only emits ArrowType.List for Iceberg lists, so build the Arrow schema directly
// to exercise the LargeList and FixedSizeList branches a real Vortex file can produce.
org.apache.arrow.vector.types.pojo.Schema arrowSchema =
new org.apache.arrow.vector.types.pojo.Schema(
List.of(
listField(
"big", ArrowType.LargeList.INSTANCE, new ArrowType.Int(Integer.SIZE, true)),
listField(
"vec",
new ArrowType.FixedSizeList(3),
new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE))));

Schema converted = VortexSchemas.convert(arrowSchema);

assertThat(converted.findField("big").type()).isInstanceOf(Types.ListType.class);
assertThat(converted.findField("big").isOptional()).isTrue();
assertThat(converted.findType("big.element")).isEqualTo(Types.IntegerType.get());
assertThat(converted.findField("vec").type()).isInstanceOf(Types.ListType.class);
assertThat(converted.findType("vec.element")).isEqualTo(Types.FloatType.get());
assertThat(TypeUtil.indexById(converted.asStruct())).hasSize(4);
}

private static Field listField(String name, ArrowType listType, ArrowType elementType) {
Field element = new Field("element", new FieldType(false, elementType, null), null);
return new Field(name, new FieldType(true, listType, null), List.of(element));
}

private static void assertStructRoundTrip(Schema roundTrip) {
// Names and types survive the round trip through Arrow (binding is by name).
assertThat(roundTrip.findField("location").type()).isInstanceOf(Types.StructType.class);
assertThat(roundTrip.findField("location").isOptional()).isTrue();
assertThat(roundTrip.findType("location.lat")).isEqualTo(Types.DoubleType.get());
assertThat(roundTrip.findField("location.lat").isRequired()).isTrue();
assertThat(roundTrip.findType("nested.inner.x")).isEqualTo(Types.IntegerType.get());
assertThat(roundTrip.findType("tags.element")).isEqualTo(Types.StringType.get());

// Every field (including nested struct fields and the list element) gets a unique synthesized
// id. indexById builds an id->field map and throws on a collision, so a successful index with
// one entry per field proves the ids are valid.
assertThat(TypeUtil.indexById(roundTrip.asStruct())).hasSize(9);
}
}
Loading