Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ public VortexValueReader<?> list(
throw new UnsupportedOperationException("Vortex LIST types are not supported yet");
}

@Override
public VortexValueReader<?> variant(Types.VariantType variantType, Field variantField) {
// Spark 3.5 has no VariantType/VariantVal, so variant columns cannot be read here.
throw new UnsupportedOperationException("Variant is not supported for Spark 3.5");
}

@Override
public VortexValueReader<?> primitive(Type.PrimitiveType icebergType, Field primField) {
return switch (icebergType.typeId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.VortexBatchReadConf;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
Expand Down Expand Up @@ -191,9 +193,12 @@ private boolean useOrcBatchReads() {
}

// conditions for using Vortex batch reads:
// - no variant is projected (ArrowColumnVector cannot surface a variant as Spark's VariantVal, so
// variant projections fall back to the row-based reader, which does support variant)
// - all tasks are of FileScanTask type and read only Vortex files
private boolean useVortexBatchReads() {
return taskGroups.stream().allMatch(this::supportsVortexBatchReads);
return TypeUtil.find(expectedSchema, Type::isVariantType) == null
&& taskGroups.stream().allMatch(this::supportsVortexBatchReads);
}

private boolean supportsVortexBatchReads(ScanTask task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public VortexValueReader<?> list(
throw new UnsupportedOperationException("Vortex LIST types are not supported yet");
}

@Override
public VortexValueReader<?> variant(Types.VariantType variantType, Field variantField) {
return SparkVortexValueReaders.variants();
}

@Override
public VortexValueReader<?> primitive(Type.PrimitiveType icebergType, Field primField) {
return switch (icebergType.typeId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark.data;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.DateDayVector;
Expand All @@ -31,10 +33,13 @@
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.iceberg.data.vortex.GenericVortexReaders;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.vortex.VortexValueReader;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;

public class SparkVortexValueReaders {
private SparkVortexValueReaders() {}
Expand Down Expand Up @@ -62,6 +67,10 @@ public static VortexValueReader<Long> time(TimeUnit timeUnit) {
return new TimeReader(timeUnit);
}

public static VortexValueReader<VariantVal> variants() {
return VariantReader.INSTANCE;
}

static class UTF8Reader implements VortexValueReader<UTF8String> {
static final UTF8Reader INSTANCE = new UTF8Reader();

Expand Down Expand Up @@ -156,4 +165,35 @@ public Long readNonNull(FieldVector vector, int row) {
};
}
}

// Converts the Iceberg Variant produced by the shared Vortex reader into Spark's VariantVal by
// re-serializing metadata and value to little-endian buffers (mirrors SparkParquetReaders).
static class VariantReader implements VortexValueReader<VariantVal> {
static final VariantReader INSTANCE = new VariantReader();

private final VortexValueReader<Variant> delegate = GenericVortexReaders.variants();

private VariantReader() {}

@Override
public VariantVal read(FieldVector vector, int row) {
Variant variant = delegate.read(vector, row);
return variant == null ? null : toVariantVal(variant);
}

@Override
public VariantVal readNonNull(FieldVector vector, int row) {
return toVariantVal(delegate.readNonNull(vector, row));
}

private static VariantVal toVariantVal(Variant variant) {
byte[] metadataBytes = new byte[variant.metadata().sizeInBytes()];
variant.metadata().writeTo(ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN), 0);

byte[] valueBytes = new byte[variant.value().sizeInBytes()];
variant.value().writeTo(ByteBuffer.wrap(valueBytes).order(ByteOrder.LITTLE_ENDIAN), 0);

return new VariantVal(valueBytes, metadataBytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.iceberg.spark.data;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.UUID;
import org.apache.arrow.vector.BigIntVector;
Expand All @@ -43,9 +45,11 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.vortex.VortexValueWriter;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;

/** Writes Spark {@link InternalRow} objects to Arrow vectors for Vortex file output. */
public class SparkVortexWriter implements VortexValueWriter<InternalRow> {
Expand All @@ -66,7 +70,7 @@ public void write(InternalRow datum, VectorSchemaRoot root, int rowIndex) {
FieldVector vector = root.getVector(fieldIndex);

if (field.isOptional() && datum.isNullAt(fieldIndex)) {
vector.setNull(rowIndex);
writeNull(vector, field.type(), rowIndex);
continue;
}

Expand Down Expand Up @@ -163,9 +167,44 @@ private static void writeValue(
// Mark the struct slot itself as non-null for this row.
structVector.setIndexDefined(rowIndex);
break;
case VARIANT:
writeVariant((StructVector) vector, row.getVariant(fieldIndex), rowIndex);
break;
default:
throw new UnsupportedOperationException(
"Unsupported Iceberg type for Vortex write: " + type);
}
}

private static void writeNull(
FieldVector vector, org.apache.iceberg.types.Type type, int rowIndex) {
if (type.isVariantType()) {
writeNullVariant((StructVector) vector, rowIndex);
} else {
vector.setNull(rowIndex);
}
}

// Variant storage is an Arrow struct of { required metadata, optional value }. Spark's VariantVal
// already holds the serialized little-endian metadata and value, so copy them straight across.
private static void writeVariant(StructVector vector, VariantVal variant, int rowIndex) {
vector.getChild("metadata", VarBinaryVector.class).setSafe(rowIndex, variant.getMetadata());
vector.getChild("value", VarBinaryVector.class).setSafe(rowIndex, variant.getValue());
vector.setIndexDefined(rowIndex);
}

// The metadata child is required, so a null variant still needs a valid (empty) metadata entry.
private static void writeNullVariant(StructVector vector, int rowIndex) {
vector.setNull(rowIndex);

VariantMetadata empty = VariantMetadata.empty();
byte[] metadataBytes = new byte[empty.sizeInBytes()];
empty.writeTo(ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN), 0);
vector.getChild("metadata", VarBinaryVector.class).setSafe(rowIndex, metadataBytes);

VarBinaryVector valueVector = vector.getChild("value", VarBinaryVector.class);
if (valueVector != null) {
valueVector.setNull(rowIndex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.VortexBatchReadConf;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
Expand Down Expand Up @@ -191,9 +193,12 @@ private boolean useOrcBatchReads() {
}

// conditions for using Vortex batch reads:
// - no variant is projected (ArrowColumnVector cannot surface a variant as Spark's VariantVal, so
// variant projections fall back to the row-based reader, which does support variant)
Comment thread
robert3005 marked this conversation as resolved.
// - all tasks are of FileScanTask type and read only Vortex files
private boolean useVortexBatchReads() {
return taskGroups.stream().allMatch(this::supportsVortexBatchReads);
return TypeUtil.find(expectedSchema, Type::isVariantType) == null
&& taskGroups.stream().allMatch(this::supportsVortexBatchReads);
}

private boolean supportsVortexBatchReads(ScanTask task) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.sql;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.TestBase;
import org.apache.spark.sql.Row;
import org.apache.spark.types.variant.Variant;
import org.apache.spark.unsafe.types.VariantVal;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
* Round-trips VARIANT columns through Vortex-format tables via Spark SQL.
*
* <p>Vortex reads default to the columnar ({@code ColumnarBatch}) path, but that path cannot
* surface a variant as Spark's {@link VariantVal}, so {@code SparkBatch} falls back to the
* row-based reader whenever a variant is projected. Projecting only {@code id} keeps the columnar
* path; projecting a variant column exercises the row-based reader.
*/
public class TestSparkVortexVariantRead extends TestBase {

private static final String CATALOG = "local";
private static final String TABLE = CATALOG + ".default.var_vortex";

@BeforeAll
public static void setupCatalog() {
// Use a Hadoop catalog to avoid Hive schema conversion (Hive doesn't support VARIANT yet)
spark.conf().set("spark.sql.catalog." + CATALOG, SparkCatalog.class.getName());
spark.conf().set("spark.sql.catalog." + CATALOG + ".type", "hadoop");
spark.conf().set("spark.sql.catalog." + CATALOG + ".default-namespace", "default");
spark.conf().set("spark.sql.catalog." + CATALOG + ".cache-enabled", "false");
String temp = System.getProperty("java.io.tmpdir") + "/iceberg_spark_vortex_variant_warehouse";
spark.conf().set("spark.sql.catalog." + CATALOG + ".warehouse", temp);
}

@BeforeEach
public void setupTable() {
sql("DROP TABLE IF EXISTS %s", TABLE);
sql(
"CREATE TABLE %s (id BIGINT, v1 VARIANT, v2 VARIANT) USING iceberg "
+ "TBLPROPERTIES ('format-version'='3', 'write.format.default'='vortex')",
TABLE);

sql("INSERT INTO %s SELECT 1, parse_json('{\"a\":1}'), parse_json('{\"x\":10}')", TABLE);
sql("INSERT INTO %s SELECT 2, parse_json('{\"b\":2}'), parse_json('{\"y\":20}')", TABLE);
}

@AfterEach
public void cleanup() {
sql("DROP TABLE IF EXISTS %s", TABLE);
}

@Test
public void readVariantColumns() {
List<Row> rows = spark.table(TABLE).select("id", "v1", "v2").orderBy("id").collectAsList();
assertThat(rows).hasSize(2);

assertThat(rows.get(0).getLong(0)).isEqualTo(1L);
assertVariantField(rows.get(0).get(1), "a", 1L);
assertVariantField(rows.get(0).get(2), "x", 10L);

assertThat(rows.get(1).getLong(0)).isEqualTo(2L);
assertVariantField(rows.get(1).get(1), "b", 2L);
assertVariantField(rows.get(1).get(2), "y", 20L);
}

@Test
public void readNonVariantProjectionUsesColumnarPath() {
// Projecting only non-variant columns keeps the columnar Vortex path enabled and must still
// return correct data after the variant fallback guard was added.
List<Row> rows = spark.table(TABLE).select("id").orderBy("id").collectAsList();
assertThat(rows).extracting(row -> row.getLong(0)).containsExactly(1L, 2L);
}

@Test
public void readNullVariant() {
sql("INSERT INTO %s SELECT 3, NULL, NULL", TABLE);

List<Row> rows = spark.table(TABLE).select("id", "v1").where("id = 3").collectAsList();
assertThat(rows).hasSize(1);
assertThat(rows.get(0).get(1)).isNull();
}

private static void assertVariantField(Object value, String key, long expected) {
assertThat(value).isInstanceOf(VariantVal.class);
VariantVal variantVal = (VariantVal) value;
Variant variant = new Variant(variantVal.getValue(), variantVal.getMetadata());
Variant field = variant.getFieldByKey(key);
assertThat(field).isNotNull();
assertThat(field.getLong()).isEqualTo(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public VortexValueReader<?> list(
throw new UnsupportedOperationException("Vortex LIST types are not supported yet");
}

@Override
public VortexValueReader<?> variant(Types.VariantType variantType, Field variantField) {
return SparkVortexValueReaders.variants();
}

@Override
public VortexValueReader<?> primitive(Type.PrimitiveType icebergType, Field primField) {
return switch (icebergType.typeId()) {
Expand Down
Loading
Loading