diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java index b5f590d6a9c0..7fa7cf5a9a02 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java +++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java @@ -30,7 +30,6 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -338,8 +337,6 @@ record -> @TestTemplate public void testProjectWithSchema() { - assumeThat(format != FileFormat.VORTEX).isTrue(); - // Test with table schema Iterable results = IcebergGenerics.read(sharedTable).project(SCHEMA).build(); diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 6d63cd2089bd..de61a8efb338 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -92,7 +92,7 @@ spark41 = "4.1.1" sqlite-jdbc = "3.53.1.0" testcontainers = "2.0.5" tez08 = { strictly = "0.8.4"} # see rich version usage explanation above -vortex = "0.74.0" +vortex = "0.75.0" [libraries] activation = { module = "javax.activation:activation", version.ref = "activation" } diff --git a/spark/v3.5/spark-runtime/runtime-deps.txt b/spark/v3.5/spark-runtime/runtime-deps.txt index f45439ec91db..6d2db5ead85d 100644 --- a/spark/v3.5/spark-runtime/runtime-deps.txt +++ b/spark/v3.5/spark-runtime/runtime-deps.txt @@ -10,8 +10,8 @@ com.google.guava:guava:33.6 com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc:j2objc-annotations:3.1 dev.failsafe:failsafe:3.3 -dev.vortex:vortex-jni:0.74 -dev.vortex:vortex-spark_2.12:0.74 +dev.vortex:vortex-jni:0.75 +dev.vortex:vortex-spark_2.12:0.75 io.airlift:aircompressor:2.0 io.netty:netty-buffer:4.2 io.netty:netty-common:4.2 diff --git a/spark/v4.0/spark-runtime/runtime-deps.txt b/spark/v4.0/spark-runtime/runtime-deps.txt index 4f1b8258f2fd..fb73267e0a61 100644 --- a/spark/v4.0/spark-runtime/runtime-deps.txt +++ b/spark/v4.0/spark-runtime/runtime-deps.txt @@ -10,8 +10,8 @@ com.google.guava:guava:33.6 com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc:j2objc-annotations:3.1 dev.failsafe:failsafe:3.3 -dev.vortex:vortex-jni:0.74 -dev.vortex:vortex-spark_2.13:0.74 +dev.vortex:vortex-jni:0.75 +dev.vortex:vortex-spark_2.13:0.75 io.airlift:aircompressor:2.0 io.netty:netty-buffer:4.2 io.netty:netty-common:4.2 diff --git a/spark/v4.1/spark-runtime/runtime-deps.txt b/spark/v4.1/spark-runtime/runtime-deps.txt index 4f1b8258f2fd..fb73267e0a61 100644 --- a/spark/v4.1/spark-runtime/runtime-deps.txt +++ b/spark/v4.1/spark-runtime/runtime-deps.txt @@ -10,8 +10,8 @@ com.google.guava:guava:33.6 com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc:j2objc-annotations:3.1 dev.failsafe:failsafe:3.3 -dev.vortex:vortex-jni:0.74 -dev.vortex:vortex-spark_2.13:0.74 +dev.vortex:vortex-jni:0.75 +dev.vortex:vortex-spark_2.13:0.75 io.airlift:aircompressor:2.0 io.netty:netty-buffer:4.2 io.netty:netty-common:4.2 diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java index 16ca29ff4053..88d0622f023e 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java @@ -82,15 +82,23 @@ private GenericVortexReader( this.readers[i] = GenericVortexReaders.constants(constants.get(id)); } else if (id == MetadataColumns.IS_DELETED.fieldId()) { this.readers[i] = GenericVortexReaders.constants(false); + } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { + this.readers[i] = GenericVortexReaders.longs(); + this.columnNames[i] = field.name(); } else { Field arrowField = arrowFieldsByName.get(field.name()); if (arrowField == null) { - // The expected field is neither a constant nor present in the data file (for example an - // unsupplied metadata column). Fill it with null rather than reading a missing column. - this.readers[i] = GenericVortexReaders.constants(null); + if (field.isOptional()) { + // The expected field is neither a constant nor present in the data file (for example an + // unsupplied metadata column). Fill it with null rather than reading a missing column. + this.readers[i] = GenericVortexReaders.constants(null); + } else { + throw new IllegalArgumentException( + String.format("Missing required field: %s", field.name())); + } } else { this.readers[i] = VortexSchemaWithTypeVisitor.visit(field.type(), arrowField, builder); - this.columnNames[i] = arrowField.getName(); + this.columnNames[i] = field.name(); } } } diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java index 2313cd0b7429..4379d069b06f 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java @@ -368,8 +368,9 @@ public CloseableIterable build() { schema.columns().stream() .filter( field -> - !constants.containsKey(field.fieldId()) - && !MetadataColumns.isMetadataColumn(field.name())) + (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) + || !constants.containsKey(field.fieldId()) + && !MetadataColumns.isMetadataColumn(field.name())) .map(Types.NestedField::name) .toList(); diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java index 77d0d19ebb2d..4e4453a4bf24 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -36,6 +37,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableGroup; @@ -43,6 +45,7 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,10 +121,24 @@ public CloseableIterator iterator() { .map(Field::getName) .collect(Collectors.toUnmodifiableSet()); - String[] projectionNames = - projection.stream().filter(fileColumns::contains).toArray(String[]::new); + ImmutableList.Builder fieldNames = ImmutableList.builder(); + ImmutableList.Builder expressions = ImmutableList.builder(); + + for (String name : projection) { + if (fileColumns.contains(name)) { + fieldNames.add(name); + expressions.add(dev.vortex.api.Expression.column(name)); + } else if (Objects.equals(name, MetadataColumns.ROW_POSITION.name())) { + fieldNames.add(name); + expressions.add(dev.vortex.api.Expression.rowIdx()); + } + } + dev.vortex.api.Expression scanProjection = - dev.vortex.api.Expression.select(projectionNames, dev.vortex.api.Expression.root()); + dev.vortex.api.Expression.pack( + fieldNames.build().toArray(String[]::new), + expressions.build().toArray(dev.vortex.api.Expression[]::new), + false); ImmutableScanOptions.Builder optionsBuilder = ScanOptions.builder().projection(scanProjection); scanFilter.ifPresent(optionsBuilder::filter);