Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions core/src/main/java/org/apache/iceberg/formats/ReadBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mapping.NameMapping;
Expand Down Expand Up @@ -119,6 +120,31 @@ default ReadBuilder<D, S> setAll(Map<String, String> properties) {
/** Sets a mapping from external schema names to Iceberg type IDs. */
ReadBuilder<D, S> withNameMapping(NameMapping nameMapping);

/**
* Whether this reader applies position deletes supplied through {@link
* #positionDeletes(PositionDeleteIndex)} during the scan. Callers must check this before relying
* on pushdown: a reader that returns {@code false} ignores pushed deletes, so the deletes still
* have to be applied after reading.
*
* @return true if pushed position deletes are honored by the scan
*/
default boolean supportsPositionDeletes() {
return false;
}

/**
* Pushes position deletes into the reader so that deleted rows are excluded during scanning,
* rather than being read and filtered out afterwards. Positions in the index are relative to the
* start of the file. Only meaningful when {@link #supportsPositionDeletes()} returns true; other
* readers ignore the index and rely on post-scan filtering.
*
* @param deletes the deleted row positions for the file being read
* @return this for method chaining
*/
default ReadBuilder<D, S> positionDeletes(PositionDeleteIndex deletes) {
return this;
}

/** Builds the reader. */
CloseableIterable<D> build();
}
28 changes: 27 additions & 1 deletion data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -65,6 +66,7 @@ public abstract class DeleteFilter<T> {
private PositionDeleteIndex deleteRowPositions = null;
private List<Predicate<T>> isInDeleteSets = null;
private Predicate<T> eqDeleteRows = null;
private boolean posDeletesPushedDown = false;

protected DeleteFilter(
String filePath,
Expand Down Expand Up @@ -258,8 +260,32 @@ public PositionDeleteIndex deletedRowPositions() {
return deleteRowPositions;
}

/**
* Returns the deleted row positions for native pushdown into a format scanner so that deleted
* rows are excluded during the scan instead of being read and filtered out afterwards. When a
* non-empty index is returned it is marked as handled, and {@link #filter(CloseableIterable)}
* will not re-apply position deletes.
*
* <p>Returns empty when there are no position deletes, or when the {@code _is_deleted} metadata
* column is projected: in that case deleted rows must be retained and marked rather than removed,
* so they cannot be dropped at the scan level.
*/
public Optional<PositionDeleteIndex> pushablePosDeletes() {
if (posDeletes.isEmpty() || hasIsDeletedColumn) {
return Optional.empty();
}

PositionDeleteIndex positions = deletedRowPositions();
if (positions == null || positions.isEmpty()) {
return Optional.empty();
}

this.posDeletesPushedDown = true;
return Optional.of(positions);
}

private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
if (posDeletes.isEmpty()) {
if (posDeletes.isEmpty() || posDeletesPushedDown) {
return records;
}

Expand Down
17 changes: 15 additions & 2 deletions data/src/main/java/org/apache/iceberg/data/GenericReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -66,7 +68,7 @@ public CloseableIterable<Record> open(FileScanTask task) {
DeleteFilter<Record> deletes = new GenericDeleteFilter(io, task, tableSchema, projection);
Schema readSchema = deletes.requiredSchema();

CloseableIterable<Record> records = openFile(task, readSchema);
CloseableIterable<Record> records = openFile(task, readSchema, deletes);
records = deletes.filter(records);
records = applyResidual(records, readSchema, task.residual());

Expand All @@ -84,7 +86,8 @@ private CloseableIterable<Record> applyResidual(
return records;
}

private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProjection) {
private CloseableIterable<Record> openFile(
FileScanTask task, Schema fileProjection, DeleteFilter<Record> deletes) {
InputFile input = io.newInputFile(task.file());
Map<Integer, ?> partition =
PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant);
Expand All @@ -95,6 +98,16 @@ private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProject
builder = builder.reuseContainers();
}

// Push position deletes into the scan when the reader applies them natively; the delete filter
// then skips re-applying them. Readers without pushdown support fall back to post-scan
// filtering.
if (builder.supportsPositionDeletes()) {
Optional<PositionDeleteIndex> pushable = deletes.pushablePosDeletes();
if (pushable.isPresent()) {
builder = builder.positionDeletes(pushable.get());
}
}

return builder
.project(fileProjection)
.idToConstant(partition)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data;

import java.io.File;
import java.io.IOException;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

/**
* Exercises the full delete-read path for Vortex tables: data and (v2) position-delete files are
* written through the standard {@link GenericFileWriterFactory} (which routes Vortex through the
* {@link org.apache.iceberg.formats.FormatModelRegistry}), and reads apply position deletes via
* native scan pushdown.
*/
@ExtendWith(ParameterizedTestExtension.class)
public class TestVortexReaderDeletes extends DeleteReadTests {
@TempDir private File tableDir;

@Parameters(name = "fileFormat = {0}, formatVersion = {1}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] {FileFormat.VORTEX, 2},
new Object[] {FileFormat.VORTEX, 3},
};
}

@Override
protected Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException {
return TestTables.create(
tableDir,
name,
schema,
spec,
formatVersion,
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.VORTEX.name()));
}

@Override
protected void dropTable(String name) {
TestTables.clearTables();
}

@Override
public StructLikeSet rowSet(String name, Table table, String... columns) throws IOException {
Types.StructType schema = table.schema().select(columns).asStruct();
StructLikeSet set = StructLikeSet.create(schema);
try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) {
Iterables.addAll(
set,
CloseableIterable.transform(
reader, record -> new InternalRecordWrapper(schema).wrap(record)));
}
return set;
}

@Override
protected boolean expectPruned() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;

import org.apache.iceberg.Parameters;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;

/**
* End-to-end merge-on-read DELETE coverage for Vortex tables. A DELETE reads the data with the
* synthetic {@code _pos} column (wired through Vortex's {@code row_idx} expression) to compute the
* positions to delete, writes a position-delete file (a Vortex delete file for v2, a deletion
* vector for v3) through the format-model registry, and the subsequent read excludes the deleted
* rows via native scan pushdown.
*/
public class TestVortexMergeOnReadDelete extends ExtensionsTestBase {

@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
public static Object[][] parameters() {
return new Object[][] {
{
SparkCatalogConfig.HADOOP.catalogName(),
SparkCatalogConfig.HADOOP.implementation(),
SparkCatalogConfig.HADOOP.properties()
}
};
}

@AfterEach
public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

private void runMergeOnReadDelete(int formatVersion) {
sql(
"CREATE TABLE %s (id INT, dep STRING) USING iceberg "
+ "TBLPROPERTIES ('%s'='vortex', '%s'='%d', '%s'='merge-on-read')",
tableName, DEFAULT_FILE_FORMAT, FORMAT_VERSION, formatVersion, DELETE_MODE);

sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')", tableName);

sql("DELETE FROM %s WHERE id IN (2, 4)", tableName);

assertEquals(
"Merge-on-read DELETE should exclude exactly the deleted rows",
ImmutableList.of(row(1, "a"), row(3, "c"), row(5, "e")),
sql("SELECT * FROM %s ORDER BY id", tableName));

// A second DELETE produces another set of positions against the same data file.
sql("DELETE FROM %s WHERE id = 5", tableName);

assertEquals(
"Subsequent merge-on-read DELETE should remove additional rows",
ImmutableList.of(row(1, "a"), row(3, "c")),
sql("SELECT * FROM %s ORDER BY id", tableName));
}

@TestTemplate
public void testMergeOnReadDeleteFormatV2() {
runMergeOnReadDelete(2);
}

@TestTemplate
public void testMergeOnReadDeleteFormatV3() {
runMergeOnReadDelete(3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public VortexValueReader<?> primitive(Type.PrimitiveType icebergType, Field prim
case FLOAT -> GenericVortexReaders.floats();
case DOUBLE -> GenericVortexReaders.doubles();
case STRING -> SparkVortexValueReaders.utf8String();
case BINARY -> GenericVortexReaders.bytes();
case BINARY -> SparkVortexValueReaders.bytes();
case DECIMAL -> GenericVortexReaders.decimals();
case TIMESTAMP, TIMESTAMP_NANO -> {
ArrowType.Timestamp ts = (ArrowType.Timestamp) primField.getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeNanoVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
Expand All @@ -43,6 +44,11 @@ public static VortexValueReader<UTF8String> utf8String() {
return UTF8Reader.INSTANCE;
}

public static VortexValueReader<byte[]> bytes() {
// Spark represents BinaryType as byte[], unlike the generic reader which yields a ByteBuffer.
return BytesReader.INSTANCE;
}

public static VortexValueReader<Integer> date() {
return DateReader.INSTANCE;
}
Expand Down Expand Up @@ -74,6 +80,17 @@ public UTF8String readNonNull(FieldVector vector, int row) {
}
}

static class BytesReader implements VortexValueReader<byte[]> {
static final BytesReader INSTANCE = new BytesReader();

private BytesReader() {}

@Override
public byte[] readNonNull(FieldVector vector, int row) {
return ((VarBinaryVector) vector).get(row);
}
}

static class UuidReader implements VortexValueReader<UTF8String> {
static final UuidReader INSTANCE = new UuidReader();

Expand Down
Loading
Loading