From e65bf9835d7431256176adb922e4367ab6cbf503 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 27 May 2026 12:04:15 +0100 Subject: [PATCH 1/9] Variant support for Iceberg Signed-off-by: Adam Gutglick --- .../data/vortex/GenericVortexWriter.java | 51 +++++++++++++++++++ .../apache/iceberg/vortex/VortexSchemas.java | 37 ++++++++++++++ .../iceberg/vortex/TestGenericVortex.java | 7 ++- 3 files changed, 94 insertions(+), 1 deletion(-) diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java index 6d9346b960a5..c49e11a28a3c 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java @@ -20,6 +20,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.time.LocalDateTime; @@ -31,6 +32,7 @@ import java.util.List; import java.util.UUID; import java.util.stream.Stream; + import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.DateDayVector; @@ -57,6 +59,10 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.Serialized; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantValue; import org.apache.iceberg.vortex.VortexValueWriter; /** Writes Iceberg generic {@link Record} objects to Arrow vectors for Vortex file output. */ @@ -226,12 +232,53 @@ private static void writeValue( // Mark the struct slot itself as non-null for this row. structVector.setIndexDefined(rowIndex); break; + case VARIANT: + writeVariant((StructVector) vector, (Variant) value, rowIndex); + + break; + default: throw new UnsupportedOperationException( "Unsupported Iceberg type for Vortex write: " + type); } } + private static void writeVariant(StructVector vector, Variant variant, int rowIndex) { + vector.setIndexDefined(rowIndex); + + writeVariantMetadata( + vector.getChild("metadata", VarBinaryVector.class), variant.metadata(), rowIndex); + writeVariantValue(vector.getChild("value", VarBinaryVector.class), variant.value(), rowIndex); + } + + private static void writeVariantMetadata( + VarBinaryVector vector, VariantMetadata metadata, int rowIndex) { + if (metadata instanceof Serialized serialized) { + writeSerialized(vector, serialized, rowIndex); + return; + } + + ByteBuffer buffer = ByteBuffer.allocate(metadata.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + int length = metadata.writeTo(buffer, 0); + vector.setSafe(rowIndex, buffer, 0, length); + } + + private static void writeVariantValue(VarBinaryVector vector, VariantValue value, int rowIndex) { + if (value instanceof Serialized serialized) { + writeSerialized(vector, serialized, rowIndex); + return; + } + + ByteBuffer buffer = ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + int length = value.writeTo(buffer, 0); + vector.setSafe(rowIndex, buffer, 0, length); + } + + private static void writeSerialized(VarBinaryVector vector, Serialized serialized, int rowIndex) { + ByteBuffer buffer = serialized.buffer(); + vector.setSafe(rowIndex, buffer, buffer.position(), buffer.remaining()); + } + @SuppressWarnings({"unchecked", "rawtypes"}) private static ColumnMetricsTracker newTracker(Types.NestedField field) { switch (field.type().typeId()) { @@ -296,6 +343,10 @@ static class ColumnMetricsTracker { private T min; private T max; + ColumnMetricsTracker(int fieldId) { + this(fieldId, null, null); + } + ColumnMetricsTracker(int fieldId, Comparator comparator) { this(fieldId, comparator, null); } diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java index cab7ca64f792..f2fb13e468f2 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java @@ -41,6 +41,12 @@ public final class VortexSchemas { /** Canonical Arrow extension name for UUIDs (matches {@code arrow.vector.extension.UuidType}). */ static final String UUID_EXTENSION_NAME = "arrow.uuid"; + /** + * Canonical Arrow extension name for Parquet variant (matches {@code + * arrow.vector.extension.ParquetVariant}). + */ + static final String VARIANT_EXTENSION_NAME = "arrow.parquet.variant"; + private VortexSchemas() {} /** Convert a Vortex file's Arrow {@link org.apache.arrow.vector.types.pojo.Schema} to Iceberg. */ @@ -229,6 +235,25 @@ yield new Field( yield new Field( name, new FieldType(nullable, ArrowType.Struct.INSTANCE, null), children.build()); } + case VARIANT -> { + Map extMetadata = + ImmutableMap.of( + ArrowType.ExtensionType.EXTENSION_METADATA_KEY_NAME, + VARIANT_EXTENSION_NAME, + ArrowType.ExtensionType.EXTENSION_METADATA_KEY_METADATA, + ""); + + ImmutableList.Builder children = ImmutableList.builder(); + children.add( + new Field("metadata", new FieldType(false, ArrowType.Binary.INSTANCE, null), null)); + children.add( + new Field("value", new FieldType(true, ArrowType.Binary.INSTANCE, null), null)); + + yield new Field( + name, + new FieldType(nullable, ArrowType.Struct.INSTANCE, null, extMetadata), + children.build()); + } default -> throw new UnsupportedOperationException( "Unsupported Iceberg type for Arrow conversion: " + type); @@ -492,6 +517,11 @@ private static Type toIcebergType(Field field, AtomicInteger nextId) { if (isUuidField(field)) { return Types.UUIDType.get(); } + + if (isVariantField(field)) { + return Types.VariantType.get(); + } + ArrowType arrowType = field.getType(); if (arrowType instanceof ArrowType.Int intType) { return intType.getBitWidth() <= Integer.SIZE ? Types.IntegerType.get() : Types.LongType.get(); @@ -683,5 +713,12 @@ public static boolean isUuidField( .get( dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType .EXTENSION_METADATA_KEY_NAME)); + } + public static boolean isVariantField(Field field) { + if (field.getType() instanceof ArrowType.ExtensionType ext) { + return VARIANT_EXTENSION_NAME.equals(ext.extensionName()); + } + return VARIANT_EXTENSION_NAME.equals( + field.getMetadata().get(ArrowType.ExtensionType.EXTENSION_METADATA_KEY_NAME)); } } diff --git a/vortex/src/test/java/org/apache/iceberg/vortex/TestGenericVortex.java b/vortex/src/test/java/org/apache/iceberg/vortex/TestGenericVortex.java index 3722a1b6d0f8..2f6bdbc8ea0a 100644 --- a/vortex/src/test/java/org/apache/iceberg/vortex/TestGenericVortex.java +++ b/vortex/src/test/java/org/apache/iceberg/vortex/TestGenericVortex.java @@ -48,7 +48,12 @@ protected boolean supportsUnknown() { @Override protected boolean supportsVariant() { - return false; + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; } @Override From 12f61dc66797a329188e1b389380f6f8332f8227 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 27 May 2026 15:13:53 +0100 Subject: [PATCH 2/9] Initial support Signed-off-by: Adam Gutglick --- .../data/vortex/GenericVortexReader.java | 4 + .../data/vortex/GenericVortexReaders.java | 79 +++++++++++ .../data/vortex/GenericVortexWriter.java | 31 ++++- .../vortex/VortexSchemaWithTypeVisitor.java | 4 + .../apache/iceberg/vortex/VortexSchemas.java | 65 +++++++++ .../iceberg/vortex/TestVortexSchemas.java | 128 +++++++++++++++++- 6 files changed, 303 insertions(+), 8 deletions(-) diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java index 88d0622f023e..92230f7f5ab2 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java @@ -170,6 +170,10 @@ public VortexValueReader list( @Override public VortexValueReader primitive(Type.PrimitiveType iPrimitive, Field primField) { + if (VortexSchemas.isVariantField(primField)) { + return GenericVortexReaders.variants(); + } + if ((iPrimitive != null && iPrimitive.typeId() == Type.TypeID.UUID) || VortexSchemas.isUuidField(primField)) { return GenericVortexReaders.uuids(); diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java index 26df6752eec7..92989eccf429 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java @@ -20,6 +20,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.LocalDate; @@ -53,6 +54,9 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantValue; import org.apache.iceberg.vortex.VortexValueReader; public class GenericVortexReaders { @@ -94,6 +98,10 @@ public static VortexValueReader uuids() { return UuidReader.INSTANCE; } + public static VortexValueReader variants() { + return VariantReader.INSTANCE; + } + public static VortexValueReader date(boolean isMillis) { return new DateReader(isMillis); } @@ -314,6 +322,77 @@ static FixedSizeBinaryVector uuidStorage(FieldVector vector) { return (FixedSizeBinaryVector) vector; } + private static class VariantReader implements VortexValueReader { + static final VariantReader INSTANCE = new VariantReader(); + + private VariantReader() {} + + @Override + public Variant read(FieldVector vector, int row) { + StructVector storage = variantStorage(vector); + VarBinaryVector valueVector = storage.getChild("value", VarBinaryVector.class); + if (vector.isNull(row) || isMissingBinary(valueVector, row)) { + FieldVector typedValueVector = (FieldVector) storage.getChild("typed_value"); + if (typedValueVector != null && !typedValueVector.isNull(row)) { + throw new UnsupportedOperationException( + "Reading shredded Variant values from Vortex is not supported yet"); + } + + return null; + } + + return readVariant(storage, valueVector, row); + } + + @Override + public Variant readNonNull(FieldVector vector, int row) { + StructVector storage = variantStorage(vector); + VarBinaryVector valueVector = storage.getChild("value", VarBinaryVector.class); + if (isMissingBinary(valueVector, row)) { + throw new UnsupportedOperationException( + "Reading shredded Variant values from Vortex is not supported yet"); + } + + return readVariant(storage, valueVector, row); + } + + private Variant readVariant(StructVector storage, VarBinaryVector valueVector, int row) { + VarBinaryVector metadataVector = storage.getChild("metadata", VarBinaryVector.class); + + if (metadataVector == null || metadataVector.isNull(row)) { + throw new IllegalStateException("Invalid Vortex variant: metadata is null"); + } + + byte[] metadataBytes = metadataVector.get(row); + byte[] valueBytes = valueVector.get(row); + if (metadataBytes.length == 0 || valueBytes.length == 0) { + throw new IllegalStateException( + "Invalid Vortex variant: serialized value is empty (metadata=" + + metadataBytes.length + + ", value=" + + valueBytes.length + + ")"); + } + + VariantMetadata metadata = + VariantMetadata.from(ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN)); + VariantValue value = + VariantValue.from(metadata, ByteBuffer.wrap(valueBytes).order(ByteOrder.LITTLE_ENDIAN)); + return Variant.of(metadata, value); + } + } + + private static boolean isMissingBinary(VarBinaryVector vector, int row) { + return vector == null || vector.isNull(row) || vector.get(row).length == 0; + } + + private static StructVector variantStorage(FieldVector vector) { + if (vector instanceof ExtensionTypeVector ext) { + return (StructVector) ext.getUnderlyingVector(); + } + return (StructVector) vector; + } + private static class DateReader implements VortexValueReader { private final boolean isMillis; diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java index c49e11a28a3c..55566f555802 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java @@ -56,6 +56,7 @@ import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.UUIDUtil; @@ -96,7 +97,12 @@ public void write(Record datum, VectorSchemaRoot root, int rowIndex) { ColumnMetricsTracker tracker = (ColumnMetricsTracker) trackers[fieldIndex]; if (value == null) { - vector.setNull(rowIndex); + if (field.isRequired()) { + throw new IllegalArgumentException( + "Cannot write null value for required field: " + field); + } + + writeNull(vector, field.type(), rowIndex); tracker.addNull(); continue; } @@ -236,13 +242,31 @@ private static void writeValue( writeVariant((StructVector) vector, (Variant) value, rowIndex); break; - default: throw new UnsupportedOperationException( "Unsupported Iceberg type for Vortex write: " + type); } } + private static void writeNull(FieldVector vector, Type type, int rowIndex) { + if (type.isVariantType()) { + writeNullVariant((StructVector) vector, rowIndex); + } else { + vector.setNull(rowIndex); + } + } + + private static void writeNullVariant(StructVector vector, int rowIndex) { + vector.setNull(rowIndex); + writeVariantMetadata( + vector.getChild("metadata", VarBinaryVector.class), VariantMetadata.empty(), rowIndex); + + VarBinaryVector valueVector = vector.getChild("value", VarBinaryVector.class); + if (valueVector != null) { + valueVector.setNull(rowIndex); + } + } + private static void writeVariant(StructVector vector, Variant variant, int rowIndex) { vector.setIndexDefined(rowIndex); @@ -275,8 +299,7 @@ private static void writeVariantValue(VarBinaryVector vector, VariantValue value } private static void writeSerialized(VarBinaryVector vector, Serialized serialized, int rowIndex) { - ByteBuffer buffer = serialized.buffer(); - vector.setSafe(rowIndex, buffer, buffer.position(), buffer.remaining()); + vector.setSafe(rowIndex, ByteBuffers.toByteArray(serialized.buffer())); } @SuppressWarnings({"unchecked", "rawtypes"}) diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java index 590557e4e8c7..8f7e8c285419 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java @@ -48,6 +48,10 @@ public static T visit( } public static T visit(Type iType, Field field, VortexSchemaWithTypeVisitor visitor) { + if ((iType != null && iType.isVariantType()) || VortexSchemas.isVariantField(field)) { + return visitor.primitive(null, field); + } + ArrowType arrowType = field.getType(); if (arrowType instanceof ArrowType.Struct) { return visitStruct(iType != null ? iType.asStructType() : null, field.getChildren(), visitor); diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java index f2fb13e468f2..47cf17625351 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java @@ -31,6 +31,7 @@ import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -519,6 +520,7 @@ private static Type toIcebergType(Field field, AtomicInteger nextId) { } if (isVariantField(field)) { + validateVariantField(field); return Types.VariantType.get(); } @@ -649,6 +651,69 @@ private static Type toIcebergFloatingPoint( throw new UnsupportedOperationException("Half-precision floats are not supported"); }; } + + private static void validateVariantField(Field field) { + Preconditions.checkArgument( + field.getType() instanceof ArrowType.Struct, + "Invalid Arrow variant field %s: expected struct storage type, found %s", + field.getName(), + field.getType()); + + Field metadata = findChild(field, "metadata"); + Preconditions.checkArgument( + metadata != null, + "Invalid Arrow variant field %s: missing metadata child", + field.getName()); + Preconditions.checkArgument( + !metadata.isNullable(), + "Invalid Arrow variant field %s: metadata child must be non-nullable", + field.getName()); + Preconditions.checkArgument( + isBinaryLike(metadata.getType()), + "Invalid Arrow variant field %s: metadata child must be binary, found %s", + field.getName(), + metadata.getType()); + + Field value = findChild(field, "value"); + if (value != null) { + Preconditions.checkArgument( + value.isNullable(), + "Invalid Arrow variant field %s: value child must be nullable", + field.getName()); + Preconditions.checkArgument( + isBinaryLike(value.getType()), + "Invalid Arrow variant field %s: value child must be binary, found %s", + field.getName(), + value.getType()); + } + + Field typedValue = findChild(field, "typed_value"); + if (typedValue != null) { + Preconditions.checkArgument( + typedValue.isNullable(), + "Invalid Arrow variant field %s: typed_value child must be nullable", + field.getName()); + } + + Preconditions.checkArgument( + value != null || typedValue != null, + "Invalid Arrow variant field %s: expected value or typed_value child", + field.getName()); + } + + private static Field findChild(Field field, String name) { + for (Field child : field.getChildren()) { + if (name.equals(child.getName())) { + return child; + } + } + + return null; + } + + private static boolean isBinaryLike(ArrowType arrowType) { + return arrowType instanceof ArrowType.Binary || arrowType instanceof ArrowType.LargeBinary; + } private static Type toIcebergTimestamp(ArrowType.Timestamp tsType) { boolean isNano = tsType.getUnit() == TimeUnit.NANOSECOND; diff --git a/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java index 9a1e5877ee76..5ca9854848d1 100644 --- a/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java +++ b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java @@ -21,8 +21,10 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; +import java.util.Map; import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; @@ -32,7 +34,14 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; -public class TestVortexSchemas { +class TestVortexSchemas { + private static final Map VARIANT_METADATA = + Map.of( + ArrowType.ExtensionType.EXTENSION_METADATA_KEY_NAME, + VortexSchemas.VARIANT_EXTENSION_NAME, + ArrowType.ExtensionType.EXTENSION_METADATA_KEY_METADATA, + ""); + // A struct nested inside a struct, plus a list, so the conversion exercises every recursive // branch and id is forced to stay unique across siblings, nested structs, and list elements. private static final Schema SCHEMA = @@ -55,17 +64,17 @@ public class TestVortexSchemas { Types.StructType.of(required(9, "x", Types.IntegerType.get())))))); @Test - public void testConvertLocalArrowStructTypes() { + void convertLocalArrowStructTypes() { assertStructRoundTrip(VortexSchemas.convert(VortexSchemas.toArrowSchema(SCHEMA))); } @Test - public void testConvertRelocatedArrowStructTypes() { + void convertRelocatedArrowStructTypes() { assertStructRoundTrip(VortexSchemas.convert(VortexSchemas.toVortexArrowSchema(SCHEMA))); } @Test - public void testConvertLargeAndFixedSizeLists() { + void convertLargeAndFixedSizeLists() { // toArrowSchema only emits ArrowType.List for Iceberg lists, so build the Arrow schema directly // to exercise the LargeList and FixedSizeList branches a real Vortex file can produce. org.apache.arrow.vector.types.pojo.Schema arrowSchema = @@ -88,11 +97,122 @@ public void testConvertLargeAndFixedSizeLists() { assertThat(TypeUtil.indexById(converted.asStruct())).hasSize(4); } + @Test + void variantToArrowUsesCanonicalUnshreddedStorage() { + Schema icebergSchema = + new Schema( + required(1, "id", Types.LongType.get()), optional(2, "v", Types.VariantType.get())); + + Field variant = VortexSchemas.toArrowSchema(icebergSchema).findField("v"); + + assertThat(VortexSchemas.isVariantField(variant)).isTrue(); + assertThat(variant.isNullable()).isTrue(); + assertThat(variant.getType()).isEqualTo(ArrowType.Struct.INSTANCE); + assertThat(variant.getChildren()).hasSize(2); + + Field metadata = variant.getChildren().get(0); + assertThat(metadata.getName()).isEqualTo("metadata"); + assertThat(metadata.isNullable()).isFalse(); + assertThat(metadata.getType()).isEqualTo(ArrowType.Binary.INSTANCE); + + Field value = variant.getChildren().get(1); + assertThat(value.getName()).isEqualTo("value"); + assertThat(value.isNullable()).isTrue(); + assertThat(value.getType()).isEqualTo(ArrowType.Binary.INSTANCE); + } + + @Test + void variantFromArrowAcceptsTypedValueOnlyStorage() { + Field variant = + variantField( + "v", + true, + List.of( + binaryField("metadata", false), + new Field( + "typed_value", + new FieldType(true, new ArrowType.Int(Integer.SIZE, true), null), + null))); + + Schema converted = + VortexSchemas.convert(new org.apache.arrow.vector.types.pojo.Schema(List.of(variant))); + + assertThat(converted.columns()).containsExactly(optional(0, "v", Types.VariantType.get())); + } + + @Test + void variantFromArrowRequiresMetadataChild() { + Field variant = variantField("v", true, List.of(binaryField("value", true))); + + assertThatThrownBy( + () -> + VortexSchemas.convert( + new org.apache.arrow.vector.types.pojo.Schema(List.of(variant)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("metadata"); + } + + @Test + void variantFromArrowRequiresValueOrTypedValueChild() { + Field variant = variantField("v", true, List.of(binaryField("metadata", false))); + + assertThatThrownBy( + () -> + VortexSchemas.convert( + new org.apache.arrow.vector.types.pojo.Schema(List.of(variant)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("value or typed_value"); + } + + @Test + void variantFromArrowRequiresNullableValueChild() { + Field variant = + variantField( + "v", true, List.of(binaryField("metadata", false), binaryField("value", false))); + + assertThatThrownBy( + () -> + VortexSchemas.convert( + new org.apache.arrow.vector.types.pojo.Schema(List.of(variant)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("value child must be nullable"); + } + + @Test + void variantFromArrowRequiresNullableTypedValueChild() { + Field variant = + variantField( + "v", + true, + List.of( + binaryField("metadata", false), + new Field( + "typed_value", + new FieldType(false, new ArrowType.Int(Integer.SIZE, true), null), + null))); + + assertThatThrownBy( + () -> + VortexSchemas.convert( + new org.apache.arrow.vector.types.pojo.Schema(List.of(variant)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("typed_value child must be nullable"); + } + private static Field listField(String name, ArrowType listType, ArrowType elementType) { Field element = new Field("element", new FieldType(false, elementType, null), null); return new Field(name, new FieldType(true, listType, null), List.of(element)); } + private static Field variantField(String name, boolean nullable, List children) { + return new Field( + name, new FieldType(nullable, ArrowType.Struct.INSTANCE, null, VARIANT_METADATA), children); + } + + private static Field binaryField(String name, boolean nullable) { + return new Field(name, new FieldType(nullable, ArrowType.Binary.INSTANCE, null), null); + } + private static void assertStructRoundTrip(Schema roundTrip) { // Names and types survive the round trip through Arrow (binding is by name). assertThat(roundTrip.findField("location").type()).isInstanceOf(Types.StructType.class); From 04923d5291513e7771d8ef1482fe3bbedf1b0216 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 4 Jun 2026 13:58:26 +0100 Subject: [PATCH 3/9] Fix + spotless Signed-off-by: Adam Gutglick --- .../data/vortex/GenericVortexWriter.java | 1 - .../apache/iceberg/vortex/VortexSchemas.java | 35 +++++++++++++++++-- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java index 55566f555802..78076e16812a 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.UUID; import java.util.stream.Stream; - import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.DateDayVector; diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java index 47cf17625351..2d5d28867f0a 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java @@ -38,6 +38,8 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; + + public final class VortexSchemas { /** Canonical Arrow extension name for UUIDs (matches {@code arrow.vector.extension.UuidType}). */ static final String UUID_EXTENSION_NAME = "arrow.uuid"; @@ -485,6 +487,34 @@ yield toVortexArrowField( null, children.build()); } + case VARIANT -> { + Map extMetadata = + ImmutableMap.of( + ArrowType.ExtensionType.EXTENSION_METADATA_KEY_NAME, + VARIANT_EXTENSION_NAME, + ArrowType.ExtensionType.EXTENSION_METADATA_KEY_METADATA, + ""); + + ImmutableList.Builder + children = ImmutableList.builder(); + children.add( + toVortexArrowField( + "metadata", + new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Binary(), + false)); + children.add( + toVortexArrowField( + "value", + new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Binary(), + nullable)); + + yield toVortexArrowField( + name, + new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Struct(), + nullable, + extMetadata, + children.build()); + } default -> throw new UnsupportedOperationException( "Unsupported Iceberg type for Arrow conversion: " + type); @@ -651,7 +681,7 @@ private static Type toIcebergFloatingPoint( throw new UnsupportedOperationException("Half-precision floats are not supported"); }; } - + private static void validateVariantField(Field field) { Preconditions.checkArgument( field.getType() instanceof ArrowType.Struct, @@ -778,7 +808,8 @@ public static boolean isUuidField( .get( dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType .EXTENSION_METADATA_KEY_NAME)); - } + } + public static boolean isVariantField(Field field) { if (field.getType() instanceof ArrowType.ExtensionType ext) { return VARIANT_EXTENSION_NAME.equals(ext.extensionName()); From fafb0e00ac0c8f97b6d61fbda2167cb590d023a0 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 4 Jun 2026 14:37:26 +0100 Subject: [PATCH 4/9] Fix some schema exports and include tests Signed-off-by: Adam Gutglick --- .../data/TestMetricsRowGroupFilter.java | 1 + .../apache/iceberg/vortex/VortexSchemas.java | 22 ++++++++++++++++++- .../iceberg/vortex/TestVortexSchemas.java | 16 ++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java index 3cb46b309d82..0cc391311f48 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java +++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java @@ -48,6 +48,7 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.FileFormat; diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java index 2d5d28867f0a..5e9bf8cd3d79 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java @@ -506,7 +506,7 @@ yield toVortexArrowField( toVortexArrowField( "value", new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Binary(), - nullable)); + true)); yield toVortexArrowField( name, @@ -580,6 +580,11 @@ private static Type toIcebergType( if (isUuidField(field)) { return Types.UUIDType.get(); } + + if (isVariantField(field)) { + return Types.VariantType.get(); + } + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType arrowType = field.getType(); if (arrowType instanceof dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Int intType) { @@ -817,4 +822,19 @@ public static boolean isVariantField(Field field) { return VARIANT_EXTENSION_NAME.equals( field.getMetadata().get(ArrowType.ExtensionType.EXTENSION_METADATA_KEY_NAME)); } + + public static boolean isVariantField( + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field) { + if (field.getType() + instanceof + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType ext) { + return VARIANT_EXTENSION_NAME.equals(ext.extensionName()); + } + return VARIANT_EXTENSION_NAME.equals( + field + .getMetadata() + .get( + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType + .EXTENSION_METADATA_KEY_NAME)); + } } diff --git a/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java index 5ca9854848d1..9fef85b660a7 100644 --- a/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java +++ b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java @@ -121,6 +121,22 @@ void variantToArrowUsesCanonicalUnshreddedStorage() { assertThat(value.getType()).isEqualTo(ArrowType.Binary.INSTANCE); } + @Test + void requiredVariantToVortexArrowKeepsValueChildNullable() { + Schema icebergSchema = + new Schema( + required(1, "id", Types.LongType.get()), required(2, "v", Types.VariantType.get())); + + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field variant = + VortexSchemas.toVortexArrowSchema(icebergSchema).findField("v"); + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field value = + variant.getChildren().get(1); + + assertThat(variant.isNullable()).isFalse(); + assertThat(value.getName()).isEqualTo("value"); + assertThat(value.isNullable()).isTrue(); + } + @Test void variantFromArrowAcceptsTypedValueOnlyStorage() { Field variant = From 521c5afc4975a49bad5f42ad9e02f56273cc1653 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 5 Jun 2026 11:01:46 +0100 Subject: [PATCH 5/9] row count stat for variant Signed-off-by: Adam Gutglick --- .../apache/iceberg/vortex/VortexMetrics.java | 15 +++++++++++++++ .../iceberg/vortex/TestVortexMetrics.java | 18 ++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexMetrics.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexMetrics.java index e098e56377eb..3a4eb7eb0a4c 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexMetrics.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexMetrics.java @@ -90,6 +90,8 @@ static Metrics buildMetrics( } }); + addVariantValueCounts(rowCount, schema, metricsConfig, valueCounts); + return new Metrics( rowCount, null, // columnSizes not available without Vortex JNI support @@ -101,6 +103,19 @@ static Metrics buildMetrics( originalTypes.isEmpty() ? null : originalTypes); } + private static void addVariantValueCounts( + long rowCount, Schema schema, MetricsConfig metricsConfig, Map valueCounts) { + for (Types.NestedField column : schema.columns()) { + int id = column.fieldId(); + MetricsModes.MetricsMode mode = MetricsUtil.metricsMode(schema, metricsConfig, id); + if (column.type().isVariantType() + && mode != MetricsModes.None.get() + && !valueCounts.containsKey(id)) { + valueCounts.put(id, rowCount); + } + } + } + private static int truncateLength(MetricsModes.MetricsMode mode) { if (mode instanceof MetricsModes.Truncate truncate) { return truncate.length(); diff --git a/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexMetrics.java b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexMetrics.java index 948dbad34e4e..a9dd0a9526fe 100644 --- a/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexMetrics.java +++ b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexMetrics.java @@ -147,6 +147,24 @@ public void testMetricsCountsMode() { assertThat(metrics.upperBounds()).isNull(); } + @Test + public void testVariantColumnReportsRowCountWithoutBounds() { + Schema variantSchema = + new Schema( + required(1, "id", Types.LongType.get()), + optional(2, "payload", Types.VariantType.get())); + FieldMetrics idMetrics = new FieldMetrics<>(1, 3, 0, 10L, 12L); + + Metrics metrics = + VortexMetrics.buildMetrics( + 3L, variantSchema, MetricsConfig.getDefault(), Stream.of(idMetrics)); + + assertThat(metrics.valueCounts()).containsEntry(2, 3L); + assertThat(metrics.nullValueCounts()).doesNotContainKey(2); + assertThat(metrics.lowerBounds()).doesNotContainKey(2); + assertThat(metrics.upperBounds()).doesNotContainKey(2); + } + @Test public void testMetricsNoneMode() { MetricsConfig noneConfig = From 86898c514bc701301bc68844e44a13c5772112e2 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 9 Jun 2026 16:50:12 +0100 Subject: [PATCH 6/9] Fix visitor variant Signed-off-by: Adam Gutglick --- .../apache/iceberg/data/vortex/GenericVortexReader.java | 9 +++++---- .../iceberg/vortex/VortexSchemaWithTypeVisitor.java | 4 +++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java index 92230f7f5ab2..b2adcab91f91 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java @@ -170,10 +170,6 @@ public VortexValueReader list( @Override public VortexValueReader primitive(Type.PrimitiveType iPrimitive, Field primField) { - if (VortexSchemas.isVariantField(primField)) { - return GenericVortexReaders.variants(); - } - if ((iPrimitive != null && iPrimitive.typeId() == Type.TypeID.UUID) || VortexSchemas.isUuidField(primField)) { return GenericVortexReaders.uuids(); @@ -195,6 +191,11 @@ public VortexValueReader primitive(Type.PrimitiveType iPrimitive, Field primF return simpleReader(arrowType); } + @Override + public VortexValueReader variant(Types.VariantType variantType, Field variantField) { + return GenericVortexReaders.variants(); + } + private static VortexValueReader simpleReader(ArrowType arrowType) { if (arrowType instanceof ArrowType.Bool) { return GenericVortexReaders.bools(); diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java index 8f7e8c285419..9c4bdfcf351c 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java @@ -40,6 +40,8 @@ public abstract class VortexSchemaWithTypeVisitor { public abstract T primitive(Type.PrimitiveType iPrimitive, Field primField); + public abstract T variant(Types.VariantType variantType, Field variantField); + public static T visit( Schema expectedSchema, org.apache.arrow.vector.types.pojo.Schema fileSchema, @@ -49,7 +51,7 @@ public static T visit( public static T visit(Type iType, Field field, VortexSchemaWithTypeVisitor visitor) { if ((iType != null && iType.isVariantType()) || VortexSchemas.isVariantField(field)) { - return visitor.primitive(null, field); + return visitor.variant(iType != null ? iType.asVariantType(): null, field); } ArrowType arrowType = field.getType(); From 393deb96926f4e86578b7feaff78120ae3e8713b Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 9 Jun 2026 17:22:32 +0100 Subject: [PATCH 7/9] post rebase fix Signed-off-by: Adam Gutglick --- .../org/apache/iceberg/data/vortex/GenericVortexWriter.java | 2 +- .../org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java | 2 +- .../src/main/java/org/apache/iceberg/vortex/VortexSchemas.java | 2 -- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java index 78076e16812a..6cb27deeb0de 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java @@ -343,7 +343,7 @@ private static ColumnMetricsTracker newTracker(Types.NestedField field) { v -> ChronoUnit.NANOS.between(LOCAL_EPOCH, (LocalDateTime) v)); } default: - if (field.type().isNestedType()) { + if (field.type().isNestedType() || field.type().isVariantType()) { // Lists, maps, and structs have no natural ordering — track counts only. return new ColumnMetricsTracker<>(field.fieldId(), null); } diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java index 9c4bdfcf351c..435daab5c657 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java @@ -51,7 +51,7 @@ public static T visit( public static T visit(Type iType, Field field, VortexSchemaWithTypeVisitor visitor) { if ((iType != null && iType.isVariantType()) || VortexSchemas.isVariantField(field)) { - return visitor.variant(iType != null ? iType.asVariantType(): null, field); + return visitor.variant(iType != null ? iType.asVariantType() : null, field); } ArrowType arrowType = field.getType(); diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java index 5e9bf8cd3d79..8db5125e4951 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java @@ -38,8 +38,6 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; - - public final class VortexSchemas { /** Canonical Arrow extension name for UUIDs (matches {@code arrow.vector.extension.UuidType}). */ static final String UUID_EXTENSION_NAME = "arrow.uuid"; From 4c25490b0436af5560f2fd0b2b35a3a7451d5096 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 16 Jun 2026 12:32:14 +0100 Subject: [PATCH 8/9] spotless --- .../java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java index 0cc391311f48..3cb46b309d82 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java +++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java @@ -48,7 +48,6 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.FileFormat; From 7049cc4c475b148ac9f04f7aaa7c892ed4c06905 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 16 Jun 2026 17:11:47 +0100 Subject: [PATCH 9/9] Add support for Variant for Spark with Vortex (#34) --- .../iceberg/spark/data/SparkVortexReader.java | 6 + .../iceberg/spark/source/SparkBatch.java | 7 +- .../iceberg/spark/data/SparkVortexReader.java | 5 + .../spark/data/SparkVortexValueReaders.java | 40 ++++++ .../iceberg/spark/data/SparkVortexWriter.java | 41 ++++++- .../iceberg/spark/source/SparkBatch.java | 7 +- .../spark/sql/TestSparkVortexVariantRead.java | 114 ++++++++++++++++++ .../iceberg/spark/data/SparkVortexReader.java | 5 + .../spark/data/SparkVortexValueReaders.java | 40 ++++++ .../iceberg/spark/data/SparkVortexWriter.java | 41 ++++++- .../iceberg/spark/source/SparkBatch.java | 7 +- .../spark/sql/TestSparkVortexVariantRead.java | 114 ++++++++++++++++++ .../vortex/VortexSchemaWithTypeVisitor.java | 6 +- .../apache/iceberg/vortex/VortexSchemas.java | 96 +++++++++------ 14 files changed, 488 insertions(+), 41 deletions(-) create mode 100644 spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java create mode 100644 spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java index bf5c97964716..6d6f6fe79707 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java @@ -147,6 +147,12 @@ public VortexValueReader list( throw new UnsupportedOperationException("Vortex LIST types are not supported yet"); } + @Override + public VortexValueReader variant(Types.VariantType variantType, Field variantField) { + // Spark 3.5 has no VariantType/VariantVal, so variant columns cannot be read here. + throw new UnsupportedOperationException("Variant is not supported for Spark 3.5"); + } + @Override public VortexValueReader primitive(Type.PrimitiveType icebergType, Field primField) { return switch (icebergType.typeId()) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 90bf52e4ce11..2a3a2a4670c3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -38,6 +38,8 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.spark.VortexBatchReadConf; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; @@ -191,9 +193,12 @@ private boolean useOrcBatchReads() { } // conditions for using Vortex batch reads: + // - no variant is projected (ArrowColumnVector cannot surface a variant as Spark's VariantVal, so + // variant projections fall back to the row-based reader, which does support variant) // - all tasks are of FileScanTask type and read only Vortex files private boolean useVortexBatchReads() { - return taskGroups.stream().allMatch(this::supportsVortexBatchReads); + return TypeUtil.find(expectedSchema, Type::isVariantType) == null + && taskGroups.stream().allMatch(this::supportsVortexBatchReads); } private boolean supportsVortexBatchReads(ScanTask task) { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java index bf5c97964716..0df9f70822ce 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java @@ -147,6 +147,11 @@ public VortexValueReader list( throw new UnsupportedOperationException("Vortex LIST types are not supported yet"); } + @Override + public VortexValueReader variant(Types.VariantType variantType, Field variantField) { + return SparkVortexValueReaders.variants(); + } + @Override public VortexValueReader primitive(Type.PrimitiveType icebergType, Field primField) { return switch (icebergType.typeId()) { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java index 8ce5ce6d20c2..c6362892b7d8 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.data; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.DateDayVector; @@ -31,10 +33,13 @@ import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.iceberg.data.vortex.GenericVortexReaders; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.Variant; import org.apache.iceberg.vortex.VortexValueReader; import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; public class SparkVortexValueReaders { private SparkVortexValueReaders() {} @@ -62,6 +67,10 @@ public static VortexValueReader time(TimeUnit timeUnit) { return new TimeReader(timeUnit); } + public static VortexValueReader variants() { + return VariantReader.INSTANCE; + } + static class UTF8Reader implements VortexValueReader { static final UTF8Reader INSTANCE = new UTF8Reader(); @@ -156,4 +165,35 @@ public Long readNonNull(FieldVector vector, int row) { }; } } + + // Converts the Iceberg Variant produced by the shared Vortex reader into Spark's VariantVal by + // re-serializing metadata and value to little-endian buffers (mirrors SparkParquetReaders). + static class VariantReader implements VortexValueReader { + static final VariantReader INSTANCE = new VariantReader(); + + private final VortexValueReader delegate = GenericVortexReaders.variants(); + + private VariantReader() {} + + @Override + public VariantVal read(FieldVector vector, int row) { + Variant variant = delegate.read(vector, row); + return variant == null ? null : toVariantVal(variant); + } + + @Override + public VariantVal readNonNull(FieldVector vector, int row) { + return toVariantVal(delegate.readNonNull(vector, row)); + } + + private static VariantVal toVariantVal(Variant variant) { + byte[] metadataBytes = new byte[variant.metadata().sizeInBytes()]; + variant.metadata().writeTo(ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN), 0); + + byte[] valueBytes = new byte[variant.value().sizeInBytes()]; + variant.value().writeTo(ByteBuffer.wrap(valueBytes).order(ByteOrder.LITTLE_ENDIAN), 0); + + return new VariantVal(valueBytes, metadataBytes); + } + } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java index fbe837a3eb18..43cb1858d83f 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.data; import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.List; import java.util.UUID; import org.apache.arrow.vector.BigIntVector; @@ -43,9 +45,11 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.vortex.VortexValueWriter; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; /** Writes Spark {@link InternalRow} objects to Arrow vectors for Vortex file output. */ public class SparkVortexWriter implements VortexValueWriter { @@ -66,7 +70,7 @@ public void write(InternalRow datum, VectorSchemaRoot root, int rowIndex) { FieldVector vector = root.getVector(fieldIndex); if (field.isOptional() && datum.isNullAt(fieldIndex)) { - vector.setNull(rowIndex); + writeNull(vector, field.type(), rowIndex); continue; } @@ -163,9 +167,44 @@ private static void writeValue( // Mark the struct slot itself as non-null for this row. structVector.setIndexDefined(rowIndex); break; + case VARIANT: + writeVariant((StructVector) vector, row.getVariant(fieldIndex), rowIndex); + break; default: throw new UnsupportedOperationException( "Unsupported Iceberg type for Vortex write: " + type); } } + + private static void writeNull( + FieldVector vector, org.apache.iceberg.types.Type type, int rowIndex) { + if (type.isVariantType()) { + writeNullVariant((StructVector) vector, rowIndex); + } else { + vector.setNull(rowIndex); + } + } + + // Variant storage is an Arrow struct of { required metadata, optional value }. Spark's VariantVal + // already holds the serialized little-endian metadata and value, so copy them straight across. + private static void writeVariant(StructVector vector, VariantVal variant, int rowIndex) { + vector.getChild("metadata", VarBinaryVector.class).setSafe(rowIndex, variant.getMetadata()); + vector.getChild("value", VarBinaryVector.class).setSafe(rowIndex, variant.getValue()); + vector.setIndexDefined(rowIndex); + } + + // The metadata child is required, so a null variant still needs a valid (empty) metadata entry. + private static void writeNullVariant(StructVector vector, int rowIndex) { + vector.setNull(rowIndex); + + VariantMetadata empty = VariantMetadata.empty(); + byte[] metadataBytes = new byte[empty.sizeInBytes()]; + empty.writeTo(ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN), 0); + vector.getChild("metadata", VarBinaryVector.class).setSafe(rowIndex, metadataBytes); + + VarBinaryVector valueVector = vector.getChild("value", VarBinaryVector.class); + if (valueVector != null) { + valueVector.setNull(rowIndex); + } + } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 90bf52e4ce11..2a3a2a4670c3 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -38,6 +38,8 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.spark.VortexBatchReadConf; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; @@ -191,9 +193,12 @@ private boolean useOrcBatchReads() { } // conditions for using Vortex batch reads: + // - no variant is projected (ArrowColumnVector cannot surface a variant as Spark's VariantVal, so + // variant projections fall back to the row-based reader, which does support variant) // - all tasks are of FileScanTask type and read only Vortex files private boolean useVortexBatchReads() { - return taskGroups.stream().allMatch(this::supportsVortexBatchReads); + return TypeUtil.find(expectedSchema, Type::isVariantType) == null + && taskGroups.stream().allMatch(this::supportsVortexBatchReads); } private boolean supportsVortexBatchReads(ScanTask task) { diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java new file mode 100644 index 000000000000..a70823312a16 --- /dev/null +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.TestBase; +import org.apache.spark.sql.Row; +import org.apache.spark.types.variant.Variant; +import org.apache.spark.unsafe.types.VariantVal; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Round-trips VARIANT columns through Vortex-format tables via Spark SQL. + * + *

Vortex reads default to the columnar ({@code ColumnarBatch}) path, but that path cannot + * surface a variant as Spark's {@link VariantVal}, so {@code SparkBatch} falls back to the + * row-based reader whenever a variant is projected. Projecting only {@code id} keeps the columnar + * path; projecting a variant column exercises the row-based reader. + */ +public class TestSparkVortexVariantRead extends TestBase { + + private static final String CATALOG = "local"; + private static final String TABLE = CATALOG + ".default.var_vortex"; + + @BeforeAll + public static void setupCatalog() { + // Use a Hadoop catalog to avoid Hive schema conversion (Hive doesn't support VARIANT yet) + spark.conf().set("spark.sql.catalog." + CATALOG, SparkCatalog.class.getName()); + spark.conf().set("spark.sql.catalog." + CATALOG + ".type", "hadoop"); + spark.conf().set("spark.sql.catalog." + CATALOG + ".default-namespace", "default"); + spark.conf().set("spark.sql.catalog." + CATALOG + ".cache-enabled", "false"); + String temp = System.getProperty("java.io.tmpdir") + "/iceberg_spark_vortex_variant_warehouse"; + spark.conf().set("spark.sql.catalog." + CATALOG + ".warehouse", temp); + } + + @BeforeEach + public void setupTable() { + sql("DROP TABLE IF EXISTS %s", TABLE); + sql( + "CREATE TABLE %s (id BIGINT, v1 VARIANT, v2 VARIANT) USING iceberg " + + "TBLPROPERTIES ('format-version'='3', 'write.format.default'='vortex')", + TABLE); + + sql("INSERT INTO %s SELECT 1, parse_json('{\"a\":1}'), parse_json('{\"x\":10}')", TABLE); + sql("INSERT INTO %s SELECT 2, parse_json('{\"b\":2}'), parse_json('{\"y\":20}')", TABLE); + } + + @AfterEach + public void cleanup() { + sql("DROP TABLE IF EXISTS %s", TABLE); + } + + @Test + public void readVariantColumns() { + List rows = spark.table(TABLE).select("id", "v1", "v2").orderBy("id").collectAsList(); + assertThat(rows).hasSize(2); + + assertThat(rows.get(0).getLong(0)).isEqualTo(1L); + assertVariantField(rows.get(0).get(1), "a", 1L); + assertVariantField(rows.get(0).get(2), "x", 10L); + + assertThat(rows.get(1).getLong(0)).isEqualTo(2L); + assertVariantField(rows.get(1).get(1), "b", 2L); + assertVariantField(rows.get(1).get(2), "y", 20L); + } + + @Test + public void readNonVariantProjectionUsesColumnarPath() { + // Projecting only non-variant columns keeps the columnar Vortex path enabled and must still + // return correct data after the variant fallback guard was added. + List rows = spark.table(TABLE).select("id").orderBy("id").collectAsList(); + assertThat(rows).extracting(row -> row.getLong(0)).containsExactly(1L, 2L); + } + + @Test + public void readNullVariant() { + sql("INSERT INTO %s SELECT 3, NULL, NULL", TABLE); + + List rows = spark.table(TABLE).select("id", "v1").where("id = 3").collectAsList(); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).get(1)).isNull(); + } + + private static void assertVariantField(Object value, String key, long expected) { + assertThat(value).isInstanceOf(VariantVal.class); + VariantVal variantVal = (VariantVal) value; + Variant variant = new Variant(variantVal.getValue(), variantVal.getMetadata()); + Variant field = variant.getFieldByKey(key); + assertThat(field).isNotNull(); + assertThat(field.getLong()).isEqualTo(expected); + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java index bf5c97964716..0df9f70822ce 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java @@ -147,6 +147,11 @@ public VortexValueReader list( throw new UnsupportedOperationException("Vortex LIST types are not supported yet"); } + @Override + public VortexValueReader variant(Types.VariantType variantType, Field variantField) { + return SparkVortexValueReaders.variants(); + } + @Override public VortexValueReader primitive(Type.PrimitiveType icebergType, Field primField) { return switch (icebergType.typeId()) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java index 8ce5ce6d20c2..c6362892b7d8 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.data; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.DateDayVector; @@ -31,10 +33,13 @@ import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.iceberg.data.vortex.GenericVortexReaders; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.Variant; import org.apache.iceberg.vortex.VortexValueReader; import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; public class SparkVortexValueReaders { private SparkVortexValueReaders() {} @@ -62,6 +67,10 @@ public static VortexValueReader time(TimeUnit timeUnit) { return new TimeReader(timeUnit); } + public static VortexValueReader variants() { + return VariantReader.INSTANCE; + } + static class UTF8Reader implements VortexValueReader { static final UTF8Reader INSTANCE = new UTF8Reader(); @@ -156,4 +165,35 @@ public Long readNonNull(FieldVector vector, int row) { }; } } + + // Converts the Iceberg Variant produced by the shared Vortex reader into Spark's VariantVal by + // re-serializing metadata and value to little-endian buffers (mirrors SparkParquetReaders). + static class VariantReader implements VortexValueReader { + static final VariantReader INSTANCE = new VariantReader(); + + private final VortexValueReader delegate = GenericVortexReaders.variants(); + + private VariantReader() {} + + @Override + public VariantVal read(FieldVector vector, int row) { + Variant variant = delegate.read(vector, row); + return variant == null ? null : toVariantVal(variant); + } + + @Override + public VariantVal readNonNull(FieldVector vector, int row) { + return toVariantVal(delegate.readNonNull(vector, row)); + } + + private static VariantVal toVariantVal(Variant variant) { + byte[] metadataBytes = new byte[variant.metadata().sizeInBytes()]; + variant.metadata().writeTo(ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN), 0); + + byte[] valueBytes = new byte[variant.value().sizeInBytes()]; + variant.value().writeTo(ByteBuffer.wrap(valueBytes).order(ByteOrder.LITTLE_ENDIAN), 0); + + return new VariantVal(valueBytes, metadataBytes); + } + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java index fbe837a3eb18..43cb1858d83f 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.data; import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.List; import java.util.UUID; import org.apache.arrow.vector.BigIntVector; @@ -43,9 +45,11 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.vortex.VortexValueWriter; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; /** Writes Spark {@link InternalRow} objects to Arrow vectors for Vortex file output. */ public class SparkVortexWriter implements VortexValueWriter { @@ -66,7 +70,7 @@ public void write(InternalRow datum, VectorSchemaRoot root, int rowIndex) { FieldVector vector = root.getVector(fieldIndex); if (field.isOptional() && datum.isNullAt(fieldIndex)) { - vector.setNull(rowIndex); + writeNull(vector, field.type(), rowIndex); continue; } @@ -163,9 +167,44 @@ private static void writeValue( // Mark the struct slot itself as non-null for this row. structVector.setIndexDefined(rowIndex); break; + case VARIANT: + writeVariant((StructVector) vector, row.getVariant(fieldIndex), rowIndex); + break; default: throw new UnsupportedOperationException( "Unsupported Iceberg type for Vortex write: " + type); } } + + private static void writeNull( + FieldVector vector, org.apache.iceberg.types.Type type, int rowIndex) { + if (type.isVariantType()) { + writeNullVariant((StructVector) vector, rowIndex); + } else { + vector.setNull(rowIndex); + } + } + + // Variant storage is an Arrow struct of { required metadata, optional value }. Spark's VariantVal + // already holds the serialized little-endian metadata and value, so copy them straight across. + private static void writeVariant(StructVector vector, VariantVal variant, int rowIndex) { + vector.getChild("metadata", VarBinaryVector.class).setSafe(rowIndex, variant.getMetadata()); + vector.getChild("value", VarBinaryVector.class).setSafe(rowIndex, variant.getValue()); + vector.setIndexDefined(rowIndex); + } + + // The metadata child is required, so a null variant still needs a valid (empty) metadata entry. + private static void writeNullVariant(StructVector vector, int rowIndex) { + vector.setNull(rowIndex); + + VariantMetadata empty = VariantMetadata.empty(); + byte[] metadataBytes = new byte[empty.sizeInBytes()]; + empty.writeTo(ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN), 0); + vector.getChild("metadata", VarBinaryVector.class).setSafe(rowIndex, metadataBytes); + + VarBinaryVector valueVector = vector.getChild("value", VarBinaryVector.class); + if (valueVector != null) { + valueVector.setNull(rowIndex); + } + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 23bd14d03bb5..d52a3f80b209 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -38,6 +38,8 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.spark.VortexBatchReadConf; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; @@ -188,9 +190,12 @@ private boolean useOrcBatchReads() { } // conditions for using Vortex batch reads: + // - no variant is projected (ArrowColumnVector cannot surface a variant as Spark's VariantVal, so + // variant projections fall back to the row-based reader, which does support variant) // - all tasks are of FileScanTask type and read only Vortex files private boolean useVortexBatchReads() { - return taskGroups.stream().allMatch(this::supportsVortexBatchReads); + return TypeUtil.find(projection, Type::isVariantType) == null + && taskGroups.stream().allMatch(this::supportsVortexBatchReads); } private boolean supportsVortexBatchReads(ScanTask task) { diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java new file mode 100644 index 000000000000..a70823312a16 --- /dev/null +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.TestBase; +import org.apache.spark.sql.Row; +import org.apache.spark.types.variant.Variant; +import org.apache.spark.unsafe.types.VariantVal; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Round-trips VARIANT columns through Vortex-format tables via Spark SQL. + * + *

Vortex reads default to the columnar ({@code ColumnarBatch}) path, but that path cannot + * surface a variant as Spark's {@link VariantVal}, so {@code SparkBatch} falls back to the + * row-based reader whenever a variant is projected. Projecting only {@code id} keeps the columnar + * path; projecting a variant column exercises the row-based reader. + */ +public class TestSparkVortexVariantRead extends TestBase { + + private static final String CATALOG = "local"; + private static final String TABLE = CATALOG + ".default.var_vortex"; + + @BeforeAll + public static void setupCatalog() { + // Use a Hadoop catalog to avoid Hive schema conversion (Hive doesn't support VARIANT yet) + spark.conf().set("spark.sql.catalog." + CATALOG, SparkCatalog.class.getName()); + spark.conf().set("spark.sql.catalog." + CATALOG + ".type", "hadoop"); + spark.conf().set("spark.sql.catalog." + CATALOG + ".default-namespace", "default"); + spark.conf().set("spark.sql.catalog." + CATALOG + ".cache-enabled", "false"); + String temp = System.getProperty("java.io.tmpdir") + "/iceberg_spark_vortex_variant_warehouse"; + spark.conf().set("spark.sql.catalog." + CATALOG + ".warehouse", temp); + } + + @BeforeEach + public void setupTable() { + sql("DROP TABLE IF EXISTS %s", TABLE); + sql( + "CREATE TABLE %s (id BIGINT, v1 VARIANT, v2 VARIANT) USING iceberg " + + "TBLPROPERTIES ('format-version'='3', 'write.format.default'='vortex')", + TABLE); + + sql("INSERT INTO %s SELECT 1, parse_json('{\"a\":1}'), parse_json('{\"x\":10}')", TABLE); + sql("INSERT INTO %s SELECT 2, parse_json('{\"b\":2}'), parse_json('{\"y\":20}')", TABLE); + } + + @AfterEach + public void cleanup() { + sql("DROP TABLE IF EXISTS %s", TABLE); + } + + @Test + public void readVariantColumns() { + List rows = spark.table(TABLE).select("id", "v1", "v2").orderBy("id").collectAsList(); + assertThat(rows).hasSize(2); + + assertThat(rows.get(0).getLong(0)).isEqualTo(1L); + assertVariantField(rows.get(0).get(1), "a", 1L); + assertVariantField(rows.get(0).get(2), "x", 10L); + + assertThat(rows.get(1).getLong(0)).isEqualTo(2L); + assertVariantField(rows.get(1).get(1), "b", 2L); + assertVariantField(rows.get(1).get(2), "y", 20L); + } + + @Test + public void readNonVariantProjectionUsesColumnarPath() { + // Projecting only non-variant columns keeps the columnar Vortex path enabled and must still + // return correct data after the variant fallback guard was added. + List rows = spark.table(TABLE).select("id").orderBy("id").collectAsList(); + assertThat(rows).extracting(row -> row.getLong(0)).containsExactly(1L, 2L); + } + + @Test + public void readNullVariant() { + sql("INSERT INTO %s SELECT 3, NULL, NULL", TABLE); + + List rows = spark.table(TABLE).select("id", "v1").where("id = 3").collectAsList(); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).get(1)).isNull(); + } + + private static void assertVariantField(Object value, String key, long expected) { + assertThat(value).isInstanceOf(VariantVal.class); + VariantVal variantVal = (VariantVal) value; + Variant variant = new Variant(variantVal.getValue(), variantVal.getMetadata()); + Variant field = variant.getFieldByKey(key); + assertThat(field).isNotNull(); + assertThat(field.getLong()).isEqualTo(expected); + } +} diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java index 435daab5c657..0b17b623f223 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java @@ -50,7 +50,7 @@ public static T visit( } public static T visit(Type iType, Field field, VortexSchemaWithTypeVisitor visitor) { - if ((iType != null && iType.isVariantType()) || VortexSchemas.isVariantField(field)) { + if (isVariant(iType, field)) { return visitor.variant(iType != null ? iType.asVariantType() : null, field); } @@ -69,6 +69,10 @@ public static T visit(Type iType, Field field, VortexSchemaWithTypeVisitor T visitStruct( Types.StructType struct, List fields, VortexSchemaWithTypeVisitor visitor) { if (struct == null) { diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java index 8db5125e4951..03fcacc07702 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java @@ -486,32 +486,7 @@ yield toVortexArrowField( children.build()); } case VARIANT -> { - Map extMetadata = - ImmutableMap.of( - ArrowType.ExtensionType.EXTENSION_METADATA_KEY_NAME, - VARIANT_EXTENSION_NAME, - ArrowType.ExtensionType.EXTENSION_METADATA_KEY_METADATA, - ""); - - ImmutableList.Builder - children = ImmutableList.builder(); - children.add( - toVortexArrowField( - "metadata", - new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Binary(), - false)); - children.add( - toVortexArrowField( - "value", - new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Binary(), - true)); - - yield toVortexArrowField( - name, - new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Struct(), - nullable, - extMetadata, - children.build()); + yield toVortexVariantArrowField(name, nullable); } default -> throw new UnsupportedOperationException( @@ -539,17 +514,45 @@ private static dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field toV children); } + private static dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field + toVortexVariantArrowField(String name, boolean nullable) { + Map extMetadata = + ImmutableMap.of( + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType + .EXTENSION_METADATA_KEY_NAME, + VARIANT_EXTENSION_NAME, + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType + .EXTENSION_METADATA_KEY_METADATA, + ""); + + ImmutableList.Builder children = + ImmutableList.builder(); + children.add( + toVortexArrowField( + "metadata", + new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Binary(), + false)); + children.add( + toVortexArrowField( + "value", + new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Binary(), + true)); + + return toVortexArrowField( + name, + new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Struct(), + nullable, + extMetadata, + children.build()); + } + private static Type toIcebergType(Field field, AtomicInteger nextId) { // UUID is conveyed as the {@code arrow.uuid} extension over // FixedSizeBinary(16). Check metadata directly so this works whether or not // the extension is registered with ExtensionTypeRegistry. - if (isUuidField(field)) { - return Types.UUIDType.get(); - } - - if (isVariantField(field)) { - validateVariantField(field); - return Types.VariantType.get(); + Type extensionType = toIcebergExtensionType(field); + if (extensionType != null) { + return extensionType; } ArrowType arrowType = field.getType(); @@ -573,16 +576,26 @@ private static Type toIcebergType(Field field, AtomicInteger nextId) { return toIcebergSimpleType(arrowType); } - private static Type toIcebergType( - dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field, AtomicInteger nextId) { + private static Type toIcebergExtensionType(Field field) { if (isUuidField(field)) { return Types.UUIDType.get(); } if (isVariantField(field)) { + validateVariantField(field); return Types.VariantType.get(); } + return null; + } + + private static Type toIcebergType( + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field, AtomicInteger nextId) { + Type extensionType = toIcebergExtensionType(field); + if (extensionType != null) { + return extensionType; + } + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType arrowType = field.getType(); if (arrowType instanceof dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Int intType) { @@ -618,6 +631,19 @@ private static Type toIcebergType( return toIcebergSimpleType(arrowType); } + private static Type toIcebergExtensionType( + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field) { + if (isUuidField(field)) { + return Types.UUIDType.get(); + } + + if (isVariantField(field)) { + return Types.VariantType.get(); + } + + return null; + } + private static Type toIcebergSimpleType(ArrowType arrowType) { if (arrowType instanceof ArrowType.Null) { return Types.UnknownType.get();