From 12a6edb7eff4878931624dc3598e4525be8cee5d Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 16 Jun 2026 13:48:16 +0100 Subject: [PATCH] Add support for Variant for Spark with Vortex Signed-off-by: Adam Gutglick --- .../iceberg/spark/data/SparkVortexReader.java | 6 + .../iceberg/spark/source/SparkBatch.java | 7 +- .../iceberg/spark/data/SparkVortexReader.java | 5 + .../spark/data/SparkVortexValueReaders.java | 40 ++++++ .../iceberg/spark/data/SparkVortexWriter.java | 41 ++++++- .../iceberg/spark/source/SparkBatch.java | 7 +- .../spark/sql/TestSparkVortexVariantRead.java | 114 ++++++++++++++++++ .../iceberg/spark/data/SparkVortexReader.java | 5 + .../spark/data/SparkVortexValueReaders.java | 40 ++++++ .../iceberg/spark/data/SparkVortexWriter.java | 41 ++++++- .../iceberg/spark/source/SparkBatch.java | 7 +- .../spark/sql/TestSparkVortexVariantRead.java | 114 ++++++++++++++++++ .../vortex/VortexSchemaWithTypeVisitor.java | 6 +- .../apache/iceberg/vortex/VortexSchemas.java | 96 +++++++++------ 14 files changed, 488 insertions(+), 41 deletions(-) create mode 100644 spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java create mode 100644 spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java index bf5c97964716..6d6f6fe79707 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java @@ -147,6 +147,12 @@ public VortexValueReader list( throw new UnsupportedOperationException("Vortex LIST types are not supported yet"); } + @Override + public VortexValueReader variant(Types.VariantType variantType, Field variantField) { + // Spark 3.5 has no VariantType/VariantVal, so variant columns cannot be read here. + throw new UnsupportedOperationException("Variant is not supported for Spark 3.5"); + } + @Override public VortexValueReader primitive(Type.PrimitiveType icebergType, Field primField) { return switch (icebergType.typeId()) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 90bf52e4ce11..2a3a2a4670c3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -38,6 +38,8 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.spark.VortexBatchReadConf; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; @@ -191,9 +193,12 @@ private boolean useOrcBatchReads() { } // conditions for using Vortex batch reads: + // - no variant is projected (ArrowColumnVector cannot surface a variant as Spark's VariantVal, so + // variant projections fall back to the row-based reader, which does support variant) // - all tasks are of FileScanTask type and read only Vortex files private boolean useVortexBatchReads() { - return taskGroups.stream().allMatch(this::supportsVortexBatchReads); + return TypeUtil.find(expectedSchema, Type::isVariantType) == null + && taskGroups.stream().allMatch(this::supportsVortexBatchReads); } private boolean supportsVortexBatchReads(ScanTask task) { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java index bf5c97964716..0df9f70822ce 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java @@ -147,6 +147,11 @@ public VortexValueReader list( throw new UnsupportedOperationException("Vortex LIST types are not supported yet"); } + @Override + public VortexValueReader variant(Types.VariantType variantType, Field variantField) { + return SparkVortexValueReaders.variants(); + } + @Override public VortexValueReader primitive(Type.PrimitiveType icebergType, Field primField) { return switch (icebergType.typeId()) { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java index 8ce5ce6d20c2..c6362892b7d8 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.data; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.DateDayVector; @@ -31,10 +33,13 @@ import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.iceberg.data.vortex.GenericVortexReaders; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.Variant; import org.apache.iceberg.vortex.VortexValueReader; import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; public class SparkVortexValueReaders { private SparkVortexValueReaders() {} @@ -62,6 +67,10 @@ public static VortexValueReader time(TimeUnit timeUnit) { return new TimeReader(timeUnit); } + public static VortexValueReader variants() { + return VariantReader.INSTANCE; + } + static class UTF8Reader implements VortexValueReader { static final UTF8Reader INSTANCE = new UTF8Reader(); @@ -156,4 +165,35 @@ public Long readNonNull(FieldVector vector, int row) { }; } } + + // Converts the Iceberg Variant produced by the shared Vortex reader into Spark's VariantVal by + // re-serializing metadata and value to little-endian buffers (mirrors SparkParquetReaders). + static class VariantReader implements VortexValueReader { + static final VariantReader INSTANCE = new VariantReader(); + + private final VortexValueReader delegate = GenericVortexReaders.variants(); + + private VariantReader() {} + + @Override + public VariantVal read(FieldVector vector, int row) { + Variant variant = delegate.read(vector, row); + return variant == null ? null : toVariantVal(variant); + } + + @Override + public VariantVal readNonNull(FieldVector vector, int row) { + return toVariantVal(delegate.readNonNull(vector, row)); + } + + private static VariantVal toVariantVal(Variant variant) { + byte[] metadataBytes = new byte[variant.metadata().sizeInBytes()]; + variant.metadata().writeTo(ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN), 0); + + byte[] valueBytes = new byte[variant.value().sizeInBytes()]; + variant.value().writeTo(ByteBuffer.wrap(valueBytes).order(ByteOrder.LITTLE_ENDIAN), 0); + + return new VariantVal(valueBytes, metadataBytes); + } + } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java index fbe837a3eb18..43cb1858d83f 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.data; import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.List; import java.util.UUID; import org.apache.arrow.vector.BigIntVector; @@ -43,9 +45,11 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.vortex.VortexValueWriter; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; /** Writes Spark {@link InternalRow} objects to Arrow vectors for Vortex file output. */ public class SparkVortexWriter implements VortexValueWriter { @@ -66,7 +70,7 @@ public void write(InternalRow datum, VectorSchemaRoot root, int rowIndex) { FieldVector vector = root.getVector(fieldIndex); if (field.isOptional() && datum.isNullAt(fieldIndex)) { - vector.setNull(rowIndex); + writeNull(vector, field.type(), rowIndex); continue; } @@ -163,9 +167,44 @@ private static void writeValue( // Mark the struct slot itself as non-null for this row. structVector.setIndexDefined(rowIndex); break; + case VARIANT: + writeVariant((StructVector) vector, row.getVariant(fieldIndex), rowIndex); + break; default: throw new UnsupportedOperationException( "Unsupported Iceberg type for Vortex write: " + type); } } + + private static void writeNull( + FieldVector vector, org.apache.iceberg.types.Type type, int rowIndex) { + if (type.isVariantType()) { + writeNullVariant((StructVector) vector, rowIndex); + } else { + vector.setNull(rowIndex); + } + } + + // Variant storage is an Arrow struct of { required metadata, optional value }. Spark's VariantVal + // already holds the serialized little-endian metadata and value, so copy them straight across. + private static void writeVariant(StructVector vector, VariantVal variant, int rowIndex) { + vector.getChild("metadata", VarBinaryVector.class).setSafe(rowIndex, variant.getMetadata()); + vector.getChild("value", VarBinaryVector.class).setSafe(rowIndex, variant.getValue()); + vector.setIndexDefined(rowIndex); + } + + // The metadata child is required, so a null variant still needs a valid (empty) metadata entry. + private static void writeNullVariant(StructVector vector, int rowIndex) { + vector.setNull(rowIndex); + + VariantMetadata empty = VariantMetadata.empty(); + byte[] metadataBytes = new byte[empty.sizeInBytes()]; + empty.writeTo(ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN), 0); + vector.getChild("metadata", VarBinaryVector.class).setSafe(rowIndex, metadataBytes); + + VarBinaryVector valueVector = vector.getChild("value", VarBinaryVector.class); + if (valueVector != null) { + valueVector.setNull(rowIndex); + } + } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 90bf52e4ce11..2a3a2a4670c3 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -38,6 +38,8 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.spark.VortexBatchReadConf; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; @@ -191,9 +193,12 @@ private boolean useOrcBatchReads() { } // conditions for using Vortex batch reads: + // - no variant is projected (ArrowColumnVector cannot surface a variant as Spark's VariantVal, so + // variant projections fall back to the row-based reader, which does support variant) // - all tasks are of FileScanTask type and read only Vortex files private boolean useVortexBatchReads() { - return taskGroups.stream().allMatch(this::supportsVortexBatchReads); + return TypeUtil.find(expectedSchema, Type::isVariantType) == null + && taskGroups.stream().allMatch(this::supportsVortexBatchReads); } private boolean supportsVortexBatchReads(ScanTask task) { diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java new file mode 100644 index 000000000000..a70823312a16 --- /dev/null +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.TestBase; +import org.apache.spark.sql.Row; +import org.apache.spark.types.variant.Variant; +import org.apache.spark.unsafe.types.VariantVal; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Round-trips VARIANT columns through Vortex-format tables via Spark SQL. + * + *

Vortex reads default to the columnar ({@code ColumnarBatch}) path, but that path cannot + * surface a variant as Spark's {@link VariantVal}, so {@code SparkBatch} falls back to the + * row-based reader whenever a variant is projected. Projecting only {@code id} keeps the columnar + * path; projecting a variant column exercises the row-based reader. + */ +public class TestSparkVortexVariantRead extends TestBase { + + private static final String CATALOG = "local"; + private static final String TABLE = CATALOG + ".default.var_vortex"; + + @BeforeAll + public static void setupCatalog() { + // Use a Hadoop catalog to avoid Hive schema conversion (Hive doesn't support VARIANT yet) + spark.conf().set("spark.sql.catalog." + CATALOG, SparkCatalog.class.getName()); + spark.conf().set("spark.sql.catalog." + CATALOG + ".type", "hadoop"); + spark.conf().set("spark.sql.catalog." + CATALOG + ".default-namespace", "default"); + spark.conf().set("spark.sql.catalog." + CATALOG + ".cache-enabled", "false"); + String temp = System.getProperty("java.io.tmpdir") + "/iceberg_spark_vortex_variant_warehouse"; + spark.conf().set("spark.sql.catalog." + CATALOG + ".warehouse", temp); + } + + @BeforeEach + public void setupTable() { + sql("DROP TABLE IF EXISTS %s", TABLE); + sql( + "CREATE TABLE %s (id BIGINT, v1 VARIANT, v2 VARIANT) USING iceberg " + + "TBLPROPERTIES ('format-version'='3', 'write.format.default'='vortex')", + TABLE); + + sql("INSERT INTO %s SELECT 1, parse_json('{\"a\":1}'), parse_json('{\"x\":10}')", TABLE); + sql("INSERT INTO %s SELECT 2, parse_json('{\"b\":2}'), parse_json('{\"y\":20}')", TABLE); + } + + @AfterEach + public void cleanup() { + sql("DROP TABLE IF EXISTS %s", TABLE); + } + + @Test + public void readVariantColumns() { + List rows = spark.table(TABLE).select("id", "v1", "v2").orderBy("id").collectAsList(); + assertThat(rows).hasSize(2); + + assertThat(rows.get(0).getLong(0)).isEqualTo(1L); + assertVariantField(rows.get(0).get(1), "a", 1L); + assertVariantField(rows.get(0).get(2), "x", 10L); + + assertThat(rows.get(1).getLong(0)).isEqualTo(2L); + assertVariantField(rows.get(1).get(1), "b", 2L); + assertVariantField(rows.get(1).get(2), "y", 20L); + } + + @Test + public void readNonVariantProjectionUsesColumnarPath() { + // Projecting only non-variant columns keeps the columnar Vortex path enabled and must still + // return correct data after the variant fallback guard was added. + List rows = spark.table(TABLE).select("id").orderBy("id").collectAsList(); + assertThat(rows).extracting(row -> row.getLong(0)).containsExactly(1L, 2L); + } + + @Test + public void readNullVariant() { + sql("INSERT INTO %s SELECT 3, NULL, NULL", TABLE); + + List rows = spark.table(TABLE).select("id", "v1").where("id = 3").collectAsList(); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).get(1)).isNull(); + } + + private static void assertVariantField(Object value, String key, long expected) { + assertThat(value).isInstanceOf(VariantVal.class); + VariantVal variantVal = (VariantVal) value; + Variant variant = new Variant(variantVal.getValue(), variantVal.getMetadata()); + Variant field = variant.getFieldByKey(key); + assertThat(field).isNotNull(); + assertThat(field.getLong()).isEqualTo(expected); + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java index bf5c97964716..0df9f70822ce 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java @@ -147,6 +147,11 @@ public VortexValueReader list( throw new UnsupportedOperationException("Vortex LIST types are not supported yet"); } + @Override + public VortexValueReader variant(Types.VariantType variantType, Field variantField) { + return SparkVortexValueReaders.variants(); + } + @Override public VortexValueReader primitive(Type.PrimitiveType icebergType, Field primField) { return switch (icebergType.typeId()) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java index 8ce5ce6d20c2..c6362892b7d8 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexValueReaders.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.data; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.DateDayVector; @@ -31,10 +33,13 @@ import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.iceberg.data.vortex.GenericVortexReaders; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.Variant; import org.apache.iceberg.vortex.VortexValueReader; import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; public class SparkVortexValueReaders { private SparkVortexValueReaders() {} @@ -62,6 +67,10 @@ public static VortexValueReader time(TimeUnit timeUnit) { return new TimeReader(timeUnit); } + public static VortexValueReader variants() { + return VariantReader.INSTANCE; + } + static class UTF8Reader implements VortexValueReader { static final UTF8Reader INSTANCE = new UTF8Reader(); @@ -156,4 +165,35 @@ public Long readNonNull(FieldVector vector, int row) { }; } } + + // Converts the Iceberg Variant produced by the shared Vortex reader into Spark's VariantVal by + // re-serializing metadata and value to little-endian buffers (mirrors SparkParquetReaders). + static class VariantReader implements VortexValueReader { + static final VariantReader INSTANCE = new VariantReader(); + + private final VortexValueReader delegate = GenericVortexReaders.variants(); + + private VariantReader() {} + + @Override + public VariantVal read(FieldVector vector, int row) { + Variant variant = delegate.read(vector, row); + return variant == null ? null : toVariantVal(variant); + } + + @Override + public VariantVal readNonNull(FieldVector vector, int row) { + return toVariantVal(delegate.readNonNull(vector, row)); + } + + private static VariantVal toVariantVal(Variant variant) { + byte[] metadataBytes = new byte[variant.metadata().sizeInBytes()]; + variant.metadata().writeTo(ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN), 0); + + byte[] valueBytes = new byte[variant.value().sizeInBytes()]; + variant.value().writeTo(ByteBuffer.wrap(valueBytes).order(ByteOrder.LITTLE_ENDIAN), 0); + + return new VariantVal(valueBytes, metadataBytes); + } + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java index fbe837a3eb18..43cb1858d83f 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexWriter.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.data; import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.List; import java.util.UUID; import org.apache.arrow.vector.BigIntVector; @@ -43,9 +45,11 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.vortex.VortexValueWriter; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; /** Writes Spark {@link InternalRow} objects to Arrow vectors for Vortex file output. */ public class SparkVortexWriter implements VortexValueWriter { @@ -66,7 +70,7 @@ public void write(InternalRow datum, VectorSchemaRoot root, int rowIndex) { FieldVector vector = root.getVector(fieldIndex); if (field.isOptional() && datum.isNullAt(fieldIndex)) { - vector.setNull(rowIndex); + writeNull(vector, field.type(), rowIndex); continue; } @@ -163,9 +167,44 @@ private static void writeValue( // Mark the struct slot itself as non-null for this row. structVector.setIndexDefined(rowIndex); break; + case VARIANT: + writeVariant((StructVector) vector, row.getVariant(fieldIndex), rowIndex); + break; default: throw new UnsupportedOperationException( "Unsupported Iceberg type for Vortex write: " + type); } } + + private static void writeNull( + FieldVector vector, org.apache.iceberg.types.Type type, int rowIndex) { + if (type.isVariantType()) { + writeNullVariant((StructVector) vector, rowIndex); + } else { + vector.setNull(rowIndex); + } + } + + // Variant storage is an Arrow struct of { required metadata, optional value }. Spark's VariantVal + // already holds the serialized little-endian metadata and value, so copy them straight across. + private static void writeVariant(StructVector vector, VariantVal variant, int rowIndex) { + vector.getChild("metadata", VarBinaryVector.class).setSafe(rowIndex, variant.getMetadata()); + vector.getChild("value", VarBinaryVector.class).setSafe(rowIndex, variant.getValue()); + vector.setIndexDefined(rowIndex); + } + + // The metadata child is required, so a null variant still needs a valid (empty) metadata entry. + private static void writeNullVariant(StructVector vector, int rowIndex) { + vector.setNull(rowIndex); + + VariantMetadata empty = VariantMetadata.empty(); + byte[] metadataBytes = new byte[empty.sizeInBytes()]; + empty.writeTo(ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN), 0); + vector.getChild("metadata", VarBinaryVector.class).setSafe(rowIndex, metadataBytes); + + VarBinaryVector valueVector = vector.getChild("value", VarBinaryVector.class); + if (valueVector != null) { + valueVector.setNull(rowIndex); + } + } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 23bd14d03bb5..d52a3f80b209 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -38,6 +38,8 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.spark.VortexBatchReadConf; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; @@ -188,9 +190,12 @@ private boolean useOrcBatchReads() { } // conditions for using Vortex batch reads: + // - no variant is projected (ArrowColumnVector cannot surface a variant as Spark's VariantVal, so + // variant projections fall back to the row-based reader, which does support variant) // - all tasks are of FileScanTask type and read only Vortex files private boolean useVortexBatchReads() { - return taskGroups.stream().allMatch(this::supportsVortexBatchReads); + return TypeUtil.find(projection, Type::isVariantType) == null + && taskGroups.stream().allMatch(this::supportsVortexBatchReads); } private boolean supportsVortexBatchReads(ScanTask task) { diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java new file mode 100644 index 000000000000..a70823312a16 --- /dev/null +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVortexVariantRead.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.TestBase; +import org.apache.spark.sql.Row; +import org.apache.spark.types.variant.Variant; +import org.apache.spark.unsafe.types.VariantVal; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Round-trips VARIANT columns through Vortex-format tables via Spark SQL. + * + *

Vortex reads default to the columnar ({@code ColumnarBatch}) path, but that path cannot + * surface a variant as Spark's {@link VariantVal}, so {@code SparkBatch} falls back to the + * row-based reader whenever a variant is projected. Projecting only {@code id} keeps the columnar + * path; projecting a variant column exercises the row-based reader. + */ +public class TestSparkVortexVariantRead extends TestBase { + + private static final String CATALOG = "local"; + private static final String TABLE = CATALOG + ".default.var_vortex"; + + @BeforeAll + public static void setupCatalog() { + // Use a Hadoop catalog to avoid Hive schema conversion (Hive doesn't support VARIANT yet) + spark.conf().set("spark.sql.catalog." + CATALOG, SparkCatalog.class.getName()); + spark.conf().set("spark.sql.catalog." + CATALOG + ".type", "hadoop"); + spark.conf().set("spark.sql.catalog." + CATALOG + ".default-namespace", "default"); + spark.conf().set("spark.sql.catalog." + CATALOG + ".cache-enabled", "false"); + String temp = System.getProperty("java.io.tmpdir") + "/iceberg_spark_vortex_variant_warehouse"; + spark.conf().set("spark.sql.catalog." + CATALOG + ".warehouse", temp); + } + + @BeforeEach + public void setupTable() { + sql("DROP TABLE IF EXISTS %s", TABLE); + sql( + "CREATE TABLE %s (id BIGINT, v1 VARIANT, v2 VARIANT) USING iceberg " + + "TBLPROPERTIES ('format-version'='3', 'write.format.default'='vortex')", + TABLE); + + sql("INSERT INTO %s SELECT 1, parse_json('{\"a\":1}'), parse_json('{\"x\":10}')", TABLE); + sql("INSERT INTO %s SELECT 2, parse_json('{\"b\":2}'), parse_json('{\"y\":20}')", TABLE); + } + + @AfterEach + public void cleanup() { + sql("DROP TABLE IF EXISTS %s", TABLE); + } + + @Test + public void readVariantColumns() { + List rows = spark.table(TABLE).select("id", "v1", "v2").orderBy("id").collectAsList(); + assertThat(rows).hasSize(2); + + assertThat(rows.get(0).getLong(0)).isEqualTo(1L); + assertVariantField(rows.get(0).get(1), "a", 1L); + assertVariantField(rows.get(0).get(2), "x", 10L); + + assertThat(rows.get(1).getLong(0)).isEqualTo(2L); + assertVariantField(rows.get(1).get(1), "b", 2L); + assertVariantField(rows.get(1).get(2), "y", 20L); + } + + @Test + public void readNonVariantProjectionUsesColumnarPath() { + // Projecting only non-variant columns keeps the columnar Vortex path enabled and must still + // return correct data after the variant fallback guard was added. + List rows = spark.table(TABLE).select("id").orderBy("id").collectAsList(); + assertThat(rows).extracting(row -> row.getLong(0)).containsExactly(1L, 2L); + } + + @Test + public void readNullVariant() { + sql("INSERT INTO %s SELECT 3, NULL, NULL", TABLE); + + List rows = spark.table(TABLE).select("id", "v1").where("id = 3").collectAsList(); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).get(1)).isNull(); + } + + private static void assertVariantField(Object value, String key, long expected) { + assertThat(value).isInstanceOf(VariantVal.class); + VariantVal variantVal = (VariantVal) value; + Variant variant = new Variant(variantVal.getValue(), variantVal.getMetadata()); + Variant field = variant.getFieldByKey(key); + assertThat(field).isNotNull(); + assertThat(field.getLong()).isEqualTo(expected); + } +} diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java index 435daab5c657..0b17b623f223 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java @@ -50,7 +50,7 @@ public static T visit( } public static T visit(Type iType, Field field, VortexSchemaWithTypeVisitor visitor) { - if ((iType != null && iType.isVariantType()) || VortexSchemas.isVariantField(field)) { + if (isVariant(iType, field)) { return visitor.variant(iType != null ? iType.asVariantType() : null, field); } @@ -69,6 +69,10 @@ public static T visit(Type iType, Field field, VortexSchemaWithTypeVisitor T visitStruct( Types.StructType struct, List fields, VortexSchemaWithTypeVisitor visitor) { if (struct == null) { diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java index 8db5125e4951..03fcacc07702 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java @@ -486,32 +486,7 @@ yield toVortexArrowField( children.build()); } case VARIANT -> { - Map extMetadata = - ImmutableMap.of( - ArrowType.ExtensionType.EXTENSION_METADATA_KEY_NAME, - VARIANT_EXTENSION_NAME, - ArrowType.ExtensionType.EXTENSION_METADATA_KEY_METADATA, - ""); - - ImmutableList.Builder - children = ImmutableList.builder(); - children.add( - toVortexArrowField( - "metadata", - new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Binary(), - false)); - children.add( - toVortexArrowField( - "value", - new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Binary(), - true)); - - yield toVortexArrowField( - name, - new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Struct(), - nullable, - extMetadata, - children.build()); + yield toVortexVariantArrowField(name, nullable); } default -> throw new UnsupportedOperationException( @@ -539,17 +514,45 @@ private static dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field toV children); } + private static dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field + toVortexVariantArrowField(String name, boolean nullable) { + Map extMetadata = + ImmutableMap.of( + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType + .EXTENSION_METADATA_KEY_NAME, + VARIANT_EXTENSION_NAME, + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType + .EXTENSION_METADATA_KEY_METADATA, + ""); + + ImmutableList.Builder children = + ImmutableList.builder(); + children.add( + toVortexArrowField( + "metadata", + new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Binary(), + false)); + children.add( + toVortexArrowField( + "value", + new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Binary(), + true)); + + return toVortexArrowField( + name, + new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Struct(), + nullable, + extMetadata, + children.build()); + } + private static Type toIcebergType(Field field, AtomicInteger nextId) { // UUID is conveyed as the {@code arrow.uuid} extension over // FixedSizeBinary(16). Check metadata directly so this works whether or not // the extension is registered with ExtensionTypeRegistry. - if (isUuidField(field)) { - return Types.UUIDType.get(); - } - - if (isVariantField(field)) { - validateVariantField(field); - return Types.VariantType.get(); + Type extensionType = toIcebergExtensionType(field); + if (extensionType != null) { + return extensionType; } ArrowType arrowType = field.getType(); @@ -573,16 +576,26 @@ private static Type toIcebergType(Field field, AtomicInteger nextId) { return toIcebergSimpleType(arrowType); } - private static Type toIcebergType( - dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field, AtomicInteger nextId) { + private static Type toIcebergExtensionType(Field field) { if (isUuidField(field)) { return Types.UUIDType.get(); } if (isVariantField(field)) { + validateVariantField(field); return Types.VariantType.get(); } + return null; + } + + private static Type toIcebergType( + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field, AtomicInteger nextId) { + Type extensionType = toIcebergExtensionType(field); + if (extensionType != null) { + return extensionType; + } + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType arrowType = field.getType(); if (arrowType instanceof dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Int intType) { @@ -618,6 +631,19 @@ private static Type toIcebergType( return toIcebergSimpleType(arrowType); } + private static Type toIcebergExtensionType( + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field) { + if (isUuidField(field)) { + return Types.UUIDType.get(); + } + + if (isVariantField(field)) { + return Types.VariantType.get(); + } + + return null; + } + private static Type toIcebergSimpleType(ArrowType arrowType) { if (arrowType instanceof ArrowType.Null) { return Types.UnknownType.get();