visit(
// BaseParquetReaders for the non-vectorized path.
Preconditions.checkArgument(
intLogicalType.isSigned() || bitWidth < 32, "Cannot read UINT32 as an int value");
+ Field intField =
+ new Field(
+ icebergField.name(),
+ new FieldType(
+ icebergField.isOptional(), new ArrowType.Int(Integer.SIZE, true), null, null),
+ null);
+ FieldVector vector = intField.createVector(rootAlloc);
((IntVector) vector).allocateNew(batchSize);
return Optional.of(
new LogicalTypeVisitorResult(vector, ReadType.INT, (int) IntVector.TYPE_WIDTH));
} else if (bitWidth == 64) {
Preconditions.checkArgument(
intLogicalType.isSigned(), "Cannot read UINT64 as a long value");
+ Field longField =
+ new Field(
+ icebergField.name(),
+ new FieldType(
+ icebergField.isOptional(), new ArrowType.Int(Long.SIZE, true), null, null),
+ null);
+ FieldVector vector = longField.createVector(rootAlloc);
((BigIntVector) vector).allocateNew(batchSize);
return Optional.of(
new LogicalTypeVisitorResult(vector, ReadType.LONG, (int) BigIntVector.TYPE_WIDTH));
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index 1ca3bfe809c0..d093d4c97989 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -539,7 +539,7 @@ protected void nextDictEncodedVal(
.toByteBuffer()
.order(ByteOrder.LITTLE_ENDIAN);
long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer);
- vector.getDataBuffer().setLong(idx, timestampInt96);
+ vector.getDataBuffer().setLong((long) idx * typeWidth, timestampInt96);
break;
default:
throw new UnsupportedOperationException(
diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java
index cf3eb2700265..e23006c79d18 100644
--- a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java
+++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java
@@ -436,6 +436,245 @@ public void testUnsignedSmallIntegerColumnRoundtrips(int unsignedBitWidth, int v
assertThat(totalRows).isEqualTo(1);
}
+ /**
+ * Tests that the vectorized reader correctly handles int-to-long type promotion when the Parquet
+ * file has an INT(32, true) logical type annotation. This reproduces a bug where reading a file
+ * written with INT(32) logical type after an ALTER TABLE promoting the column from int to long
+ * causes a ClassCastException (BigIntVector cannot be cast to IntVector).
+ *
+ * The vector remains an IntVector (matching the physical storage), and the accessor handles
+ * widening to long on read.
+ */
+ @Test
+ public void testIntToLongPromotionWithLogicalType() throws Exception {
+ tables = new HadoopTables();
+ Schema schema = new Schema(Types.NestedField.required(1, "col", Types.IntegerType.get()));
+ Table table = tables.create(schema, tempDir.toURI() + "/int-promotion-logical");
+
+ // Write a Parquet file with INT(32, signed) logical type annotation.
+ // This is what non-Iceberg writers (PyArrow, Spark native, etc.) typically produce.
+ MessageType parquetSchema =
+ new MessageType(
+ "test",
+ primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED)
+ .as(LogicalTypeAnnotation.intType(32, true))
+ .id(1)
+ .named("col"));
+
+ File testFile = new File(tempDir, "int-logical-type-promotion.parquet");
+ List values = ImmutableList.of(1, 2, 3, Integer.MAX_VALUE);
+ try (ParquetWriter writer =
+ ExampleParquetWriter.builder(new Path(testFile.toURI())).withType(parquetSchema).build()) {
+ SimpleGroupFactory factory = new SimpleGroupFactory(parquetSchema);
+ for (int val : values) {
+ Group group = factory.newGroup();
+ group.add("col", val);
+ writer.write(group);
+ }
+ }
+
+ DataFile dataFile =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(testFile.getAbsolutePath())
+ .withFileSizeInBytes(testFile.length())
+ .withFormat(FileFormat.PARQUET)
+ .withRecordCount(values.size())
+ .build();
+ table.newAppend().appendFile(dataFile).commit();
+
+ // Promote the column type from int to long (simulates ALTER TABLE)
+ table.updateSchema().updateColumn("col", Types.LongType.get()).commit();
+ table = tables.load(tempDir.toURI() + "/int-promotion-logical");
+
+ // Read with the vectorized reader — the underlying vector is IntVector (physical type),
+ // but the accessor correctly widens to long when getLong() is called.
+ try (VectorizedTableScanIterable vectorizedReader =
+ new VectorizedTableScanIterable(table.newScan(), 1024, false)) {
+ for (ColumnarBatch batch : vectorizedReader) {
+ assertThat(batch.numRows()).isEqualTo(values.size());
+
+ FieldVector vector = batch.column(0).getArrowVector();
+ assertThat(vector)
+ .as("Vector should be IntVector matching the physical Parquet type")
+ .isInstanceOf(IntVector.class);
+
+ for (int i = 0; i < batch.numRows(); i++) {
+ assertThat(batch.column(0).getLong(i))
+ .as("Accessor should widen int to long for row %d", i)
+ .isEqualTo((long) values.get(i));
+ }
+ }
+ }
+ }
+
+ /**
+ * Tests that the vectorized reader correctly handles int-to-long type promotion when the Parquet
+ * file has a bare INT32 without any logical type annotation.
+ *
+ * The vector remains an IntVector (matching the physical storage), and the accessor handles
+ * widening to long on read.
+ */
+ @Test
+ public void testIntToLongPromotionWithoutLogicalType() throws Exception {
+ tables = new HadoopTables();
+ Schema schema = new Schema(Types.NestedField.required(1, "col", Types.IntegerType.get()));
+ Table table = tables.create(schema, tempDir.toURI() + "/int-promotion-no-logical");
+
+ // Write via Iceberg's writer which produces bare INT32 (no logical type annotation)
+ List records = Lists.newArrayList();
+ int[] values = new int[] {1, 2, 3, Integer.MAX_VALUE};
+ for (int val : values) {
+ GenericRecord rec = GenericRecord.create(schema);
+ rec.setField("col", val);
+ records.add(rec);
+ }
+
+ File testFile = new File(tempDir, "int-no-logical-type-promotion.parquet");
+ FileAppender appender =
+ Parquet.write(Files.localOutput(testFile))
+ .schema(schema)
+ .createWriterFunc(GenericParquetWriter::create)
+ .build();
+ try {
+ appender.addAll(records);
+ } finally {
+ appender.close();
+ }
+
+ DataFile dataFile =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(testFile.getAbsolutePath())
+ .withFileSizeInBytes(testFile.length())
+ .withFormat(FileFormat.PARQUET)
+ .withRecordCount(records.size())
+ .build();
+ table.newAppend().appendFile(dataFile).commit();
+
+ // Promote the column type from int to long
+ table.updateSchema().updateColumn("col", Types.LongType.get()).commit();
+ table = tables.load(tempDir.toURI() + "/int-promotion-no-logical");
+
+ // Read with the vectorized reader — the underlying vector is IntVector (physical type),
+ // but the accessor correctly widens to long when getLong() is called.
+ try (VectorizedTableScanIterable vectorizedReader =
+ new VectorizedTableScanIterable(table.newScan(), 1024, false)) {
+ for (ColumnarBatch batch : vectorizedReader) {
+ assertThat(batch.numRows()).isEqualTo(values.length);
+
+ FieldVector vector = batch.column(0).getArrowVector();
+ assertThat(vector)
+ .as("Vector should be IntVector matching the physical Parquet type")
+ .isInstanceOf(IntVector.class);
+
+ for (int i = 0; i < batch.numRows(); i++) {
+ assertThat(batch.column(0).getLong(i))
+ .as("Accessor should widen int to long for row %d", i)
+ .isEqualTo((long) values[i]);
+ }
+ }
+ }
+ }
+
+ /**
+ * Tests that int-to-long promotion works correctly when values larger than Integer.MAX_VALUE are
+ * written after the promotion and reuseContainers is true. This verifies that reading a mix of
+ * pre-promotion (int-range) and post-promotion (long-range) files works correctly.
+ */
+ @Test
+ public void testIntToLongPromotionWithLargeValuesAndReuseContainers() throws Exception {
+ tables = new HadoopTables();
+ Schema schema = new Schema(Types.NestedField.required(1, "col", Types.IntegerType.get()));
+ Table table = tables.create(schema, tempDir.toURI() + "/int-promotion-large-values");
+
+ // Write a Parquet file with INT(32, signed) logical type (pre-promotion data)
+ MessageType parquetSchema =
+ new MessageType(
+ "test",
+ primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED)
+ .as(LogicalTypeAnnotation.intType(32, true))
+ .id(1)
+ .named("col"));
+
+ File prePromotionFile = new File(tempDir, "pre-promotion.parquet");
+ List intValues = ImmutableList.of(1, 2, Integer.MAX_VALUE);
+ try (ParquetWriter writer =
+ ExampleParquetWriter.builder(new Path(prePromotionFile.toURI()))
+ .withType(parquetSchema)
+ .build()) {
+ SimpleGroupFactory factory = new SimpleGroupFactory(parquetSchema);
+ for (int val : intValues) {
+ Group group = factory.newGroup();
+ group.add("col", val);
+ writer.write(group);
+ }
+ }
+
+ DataFile prePromotionDataFile =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(prePromotionFile.getAbsolutePath())
+ .withFileSizeInBytes(prePromotionFile.length())
+ .withFormat(FileFormat.PARQUET)
+ .withRecordCount(intValues.size())
+ .build();
+ table.newAppend().appendFile(prePromotionDataFile).commit();
+
+ // Promote the column type from int to long
+ table.updateSchema().updateColumn("col", Types.LongType.get()).commit();
+ table = tables.load(tempDir.toURI() + "/int-promotion-large-values");
+
+ // Write a second file with long values > Integer.MAX_VALUE (post-promotion data)
+ List longRecords = Lists.newArrayList();
+ long[] longValues = new long[] {(long) Integer.MAX_VALUE + 1L, Long.MAX_VALUE};
+ for (long val : longValues) {
+ GenericRecord rec = GenericRecord.create(table.schema());
+ rec.setField("col", val);
+ longRecords.add(rec);
+ }
+
+ File postPromotionFile = new File(tempDir, "post-promotion.parquet");
+ FileAppender appender =
+ Parquet.write(Files.localOutput(postPromotionFile))
+ .schema(table.schema())
+ .createWriterFunc(GenericParquetWriter::create)
+ .build();
+ try {
+ appender.addAll(longRecords);
+ } finally {
+ appender.close();
+ }
+
+ DataFile postPromotionDataFile =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(postPromotionFile.getAbsolutePath())
+ .withFileSizeInBytes(postPromotionFile.length())
+ .withFormat(FileFormat.PARQUET)
+ .withRecordCount(longRecords.size())
+ .build();
+ table.newAppend().appendFile(postPromotionDataFile).commit();
+ table = tables.load(tempDir.toURI() + "/int-promotion-large-values");
+
+ // Read with reuseContainers=true and validate all values
+ List allExpectedValues = Lists.newArrayList();
+ for (int v : intValues) {
+ allExpectedValues.add((long) v);
+ }
+ for (long v : longValues) {
+ allExpectedValues.add(v);
+ }
+
+ List actualValues = Lists.newArrayList();
+ try (VectorizedTableScanIterable vectorizedReader =
+ new VectorizedTableScanIterable(table.newScan(), 1024, true)) {
+ for (ColumnarBatch batch : vectorizedReader) {
+ for (int i = 0; i < batch.numRows(); i++) {
+ actualValues.add(batch.column(0).getLong(i));
+ }
+ }
+ }
+
+ assertThat(actualValues).containsExactlyInAnyOrderElementsOf(allExpectedValues);
+ }
+
private static Stream rejectedUnsignedIntegerCases() {
return Stream.of(
Arguments.of(
@@ -998,11 +1237,6 @@ private void writeTable(boolean constantRecords) throws Exception {
overwrite.addFile(writeParquetFile(table, records));
}
overwrite.commit();
-
- // Perform a type promotion
- // TODO: The read Arrow vector should of type BigInt (promoted type) but it is Int (old type).
- Table tableLatest = tables.load(tableLocation);
- tableLatest.updateSchema().updateColumn("int_promotion", Types.LongType.get()).commit();
}
private static org.apache.arrow.vector.types.pojo.Schema createExpectedArrowSchema(
diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/TestVectorizedParquetDefinitionLevelReader.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/TestVectorizedParquetDefinitionLevelReader.java
new file mode 100644
index 000000000000..10bf3e64766c
--- /dev/null
+++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/TestVectorizedParquetDefinitionLevelReader.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.arrow.vectorized.parquet;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.io.api.Binary;
+import org.junit.jupiter.api.Test;
+
+public class TestVectorizedParquetDefinitionLevelReader {
+ private static final int UNIX_EPOCH_JULIAN_DAY = 2_440_588;
+
+ @Test
+ public void timestampInt96ReaderPackedDictionaryDecodeDecodesRowsCorrectly() {
+ try (RootAllocator allocator = new RootAllocator();
+ BigIntVector vector = new BigIntVector("ts", allocator)) {
+ vector.allocateNew(2);
+ vector.set(0, -1L);
+ vector.set(1, -1L);
+
+ VectorizedParquetDefinitionLevelReader definitionReader =
+ new VectorizedParquetDefinitionLevelReader(1, 1, false);
+ VectorizedDictionaryEncodedParquetValuesReader dictionaryReader =
+ new VectorizedDictionaryEncodedParquetValuesReader(1, false);
+
+ dictionaryReader.mode = BaseVectorizedParquetValuesReader.Mode.PACKED;
+ dictionaryReader.currentCount = 2;
+ dictionaryReader.packedValuesBuffer[0] = 0;
+ dictionaryReader.packedValuesBuffer[1] = 1;
+
+ Dictionary dictionary =
+ new Dictionary(Encoding.PLAIN_DICTIONARY) {
+ @Override
+ public int getMaxId() {
+ return 1;
+ }
+
+ @Override
+ public Binary decodeToBinary(int id) {
+ if (id == 0) {
+ return int96Binary(111_111L);
+ } else if (id == 1) {
+ return int96Binary(222_222L);
+ }
+
+ throw new IllegalArgumentException("Unexpected dictionary id: " + id);
+ }
+ };
+
+ VectorizedParquetDefinitionLevelReader.TimestampInt96Reader timestampReader =
+ definitionReader.timestampInt96Reader();
+
+ timestampReader.nextDictEncodedVal(
+ vector,
+ 0,
+ dictionaryReader,
+ dictionary,
+ BaseVectorizedParquetValuesReader.Mode.PACKED,
+ 1,
+ null,
+ Long.BYTES);
+ timestampReader.nextDictEncodedVal(
+ vector,
+ 1,
+ dictionaryReader,
+ dictionary,
+ BaseVectorizedParquetValuesReader.Mode.PACKED,
+ 1,
+ null,
+ Long.BYTES);
+
+ vector.setValueCount(2);
+
+ assertThat(vector.get(0))
+ .as("row 0 should receive the first decoded timestamp")
+ .isEqualTo(111_111L);
+ assertThat(vector.get(1))
+ .as("row 1 should receive the second decoded timestamp")
+ .isEqualTo(222_222L);
+ }
+ }
+
+ private static Binary int96Binary(long micros) {
+ long timeOfDayNanos = micros * 1_000L;
+ byte[] bytes =
+ ByteBuffer.allocate(12)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .putLong(timeOfDayNanos)
+ .putInt(UNIX_EPOCH_JULIAN_DAY)
+ .array();
+ return Binary.fromConstantByteArray(bytes);
+ }
+}
diff --git a/aws-bundle/runtime-deps.txt b/aws-bundle/runtime-deps.txt
index 73c7e0ef16b9..b7253c31b370 100644
--- a/aws-bundle/runtime-deps.txt
+++ b/aws-bundle/runtime-deps.txt
@@ -1,66 +1,70 @@
-com.github.ben-manes.caffeine:caffeine:2.9.3
-com.google.errorprone:error_prone_annotations:2.10.0
-commons-codec:commons-codec:1.17.1
+com.github.ben-manes.caffeine:caffeine:2.9
+com.google.errorprone:error_prone_annotations:2.10
+commons-codec:commons-codec:1.17
commons-logging:commons-logging:1.2
-io.netty:netty-buffer:4.1.132.Final
-io.netty:netty-codec-http2:4.1.132.Final
-io.netty:netty-codec-http:4.1.132.Final
-io.netty:netty-codec:4.1.132.Final
-io.netty:netty-common:4.1.132.Final
-io.netty:netty-handler:4.1.132.Final
-io.netty:netty-resolver:4.1.132.Final
-io.netty:netty-transport-classes-epoll:4.1.132.Final
-io.netty:netty-transport-native-unix-common:4.1.132.Final
-io.netty:netty-transport:4.1.132.Final
-org.apache.httpcomponents:httpclient:4.5.13
-org.apache.httpcomponents:httpcore:4.4.16
-org.checkerframework:checker-qual:3.19.0
-org.reactivestreams:reactive-streams:1.0.4
-software.amazon.awssdk.crt:aws-crt:0.45.1
-software.amazon.awssdk:annotations:2.42.41
-software.amazon.awssdk:apache-client:2.42.41
-software.amazon.awssdk:arns:2.42.41
-software.amazon.awssdk:auth:2.42.41
-software.amazon.awssdk:aws-core:2.42.41
-software.amazon.awssdk:aws-json-protocol:2.42.41
-software.amazon.awssdk:aws-query-protocol:2.42.41
-software.amazon.awssdk:aws-xml-protocol:2.42.41
-software.amazon.awssdk:checksums-spi:2.42.41
-software.amazon.awssdk:checksums:2.42.41
-software.amazon.awssdk:cloudwatch-metric-publisher:2.42.41
-software.amazon.awssdk:cloudwatch:2.42.41
-software.amazon.awssdk:crt-core:2.42.41
-software.amazon.awssdk:dynamodb:2.42.41
-software.amazon.awssdk:endpoints-spi:2.42.41
-software.amazon.awssdk:glue:2.42.41
-software.amazon.awssdk:http-auth-aws-crt:2.42.41
-software.amazon.awssdk:http-auth-aws-eventstream:2.42.41
-software.amazon.awssdk:http-auth-aws:2.42.41
-software.amazon.awssdk:http-auth-spi:2.42.41
-software.amazon.awssdk:http-auth:2.42.41
-software.amazon.awssdk:http-client-spi:2.42.41
-software.amazon.awssdk:iam:2.42.41
-software.amazon.awssdk:identity-spi:2.42.41
-software.amazon.awssdk:json-utils:2.42.41
-software.amazon.awssdk:kms:2.42.41
-software.amazon.awssdk:lakeformation:2.42.41
-software.amazon.awssdk:metrics-spi:2.42.41
-software.amazon.awssdk:netty-nio-client:2.42.41
-software.amazon.awssdk:profiles:2.42.41
-software.amazon.awssdk:protocol-core:2.42.41
-software.amazon.awssdk:regions:2.42.41
-software.amazon.awssdk:retries-spi:2.42.41
-software.amazon.awssdk:retries:2.42.41
-software.amazon.awssdk:s3:2.42.41
-software.amazon.awssdk:s3control:2.42.41
-software.amazon.awssdk:sdk-core:2.42.41
-software.amazon.awssdk:smithy-rpcv2-protocol:2.42.41
-software.amazon.awssdk:sso:2.42.41
-software.amazon.awssdk:sts:2.42.41
-software.amazon.awssdk:third-party-jackson-core:2.42.41
-software.amazon.awssdk:third-party-jackson-dataformat-cbor:2.42.41
-software.amazon.awssdk:utils-lite:2.42.41
-software.amazon.awssdk:utils:2.42.41
-software.amazon.eventstream:eventstream:1.0.1
-software.amazon.s3.accessgrants:aws-s3-accessgrants-java-plugin:2.4.1
-software.amazon.s3.analyticsaccelerator:analyticsaccelerator-s3:1.3.1
+io.netty:netty-buffer:4.2
+io.netty:netty-codec-base:4.2
+io.netty:netty-codec-compression:4.2
+io.netty:netty-codec-http2:4.2
+io.netty:netty-codec-http:4.2
+io.netty:netty-codec-marshalling:4.2
+io.netty:netty-codec-protobuf:4.2
+io.netty:netty-codec:4.2
+io.netty:netty-common:4.2
+io.netty:netty-handler:4.2
+io.netty:netty-resolver:4.2
+io.netty:netty-transport-classes-epoll:4.2
+io.netty:netty-transport-native-unix-common:4.2
+io.netty:netty-transport:4.2
+org.apache.httpcomponents:httpclient:4.5
+org.apache.httpcomponents:httpcore:4.4
+org.checkerframework:checker-qual:3.19
+org.reactivestreams:reactive-streams:1.0
+software.amazon.awssdk.crt:aws-crt:0.45
+software.amazon.awssdk:annotations:2.44
+software.amazon.awssdk:apache-client:2.44
+software.amazon.awssdk:arns:2.44
+software.amazon.awssdk:auth:2.44
+software.amazon.awssdk:aws-core:2.44
+software.amazon.awssdk:aws-json-protocol:2.44
+software.amazon.awssdk:aws-query-protocol:2.44
+software.amazon.awssdk:aws-xml-protocol:2.44
+software.amazon.awssdk:checksums-spi:2.44
+software.amazon.awssdk:checksums:2.44
+software.amazon.awssdk:cloudwatch-metric-publisher:2.44
+software.amazon.awssdk:cloudwatch:2.44
+software.amazon.awssdk:crt-core:2.44
+software.amazon.awssdk:dynamodb:2.44
+software.amazon.awssdk:endpoints-spi:2.44
+software.amazon.awssdk:glue:2.44
+software.amazon.awssdk:http-auth-aws-crt:2.44
+software.amazon.awssdk:http-auth-aws-eventstream:2.44
+software.amazon.awssdk:http-auth-aws:2.44
+software.amazon.awssdk:http-auth-spi:2.44
+software.amazon.awssdk:http-auth:2.44
+software.amazon.awssdk:http-client-spi:2.44
+software.amazon.awssdk:iam:2.44
+software.amazon.awssdk:identity-spi:2.44
+software.amazon.awssdk:json-utils:2.44
+software.amazon.awssdk:kms:2.44
+software.amazon.awssdk:lakeformation:2.44
+software.amazon.awssdk:metrics-spi:2.44
+software.amazon.awssdk:netty-nio-client:2.44
+software.amazon.awssdk:profiles:2.44
+software.amazon.awssdk:protocol-core:2.44
+software.amazon.awssdk:regions:2.44
+software.amazon.awssdk:retries-spi:2.44
+software.amazon.awssdk:retries:2.44
+software.amazon.awssdk:s3:2.44
+software.amazon.awssdk:s3control:2.44
+software.amazon.awssdk:sdk-core:2.44
+software.amazon.awssdk:smithy-rpcv2-protocol:2.44
+software.amazon.awssdk:sso:2.44
+software.amazon.awssdk:sts:2.44
+software.amazon.awssdk:third-party-jackson-core:2.44
+software.amazon.awssdk:third-party-jackson-dataformat-cbor:2.44
+software.amazon.awssdk:utils-lite:2.44
+software.amazon.awssdk:utils:2.44
+software.amazon.eventstream:eventstream:1.0
+software.amazon.s3.accessgrants:aws-s3-accessgrants-java-plugin:2.4
+software.amazon.s3.analyticsaccelerator:analyticsaccelerator-s3:1.3
diff --git a/azure-bundle/runtime-deps.txt b/azure-bundle/runtime-deps.txt
index 2e5198f49842..32bc06d03d62 100644
--- a/azure-bundle/runtime-deps.txt
+++ b/azure-bundle/runtime-deps.txt
@@ -1,43 +1,47 @@
-com.azure:azure-core-http-netty:1.16.3
-com.azure:azure-core:1.57.1
-com.azure:azure-identity:1.18.2
-com.azure:azure-json:1.5.1
-com.azure:azure-security-keyvault-keys:4.10.6
-com.azure:azure-storage-blob:12.33.3
-com.azure:azure-storage-common:12.32.2
-com.azure:azure-storage-file-datalake:12.26.3
-com.azure:azure-storage-internal-avro:12.18.2
-com.azure:azure-xml:1.2.1
-com.fasterxml.jackson.core:jackson-annotations:2.18.4
-com.fasterxml.jackson.core:jackson-core:2.18.4.1
-com.fasterxml.jackson.core:jackson-databind:2.18.4
-com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.18.4
-com.microsoft.azure:msal4j-persistence-extension:1.3.0
-com.microsoft.azure:msal4j:1.23.1
-io.netty:netty-buffer:4.1.130.Final
-io.netty:netty-codec-dns:4.1.128.Final
-io.netty:netty-codec-http2:4.1.130.Final
-io.netty:netty-codec-http:4.1.130.Final
-io.netty:netty-codec-socks:4.1.130.Final
-io.netty:netty-codec:4.1.130.Final
-io.netty:netty-common:4.1.130.Final
-io.netty:netty-handler-proxy:4.1.130.Final
-io.netty:netty-handler:4.1.130.Final
-io.netty:netty-resolver-dns-classes-macos:4.1.128.Final
-io.netty:netty-resolver-dns-native-macos:4.1.128.Final
-io.netty:netty-resolver-dns:4.1.128.Final
-io.netty:netty-resolver:4.1.130.Final
-io.netty:netty-tcnative-boringssl-static:2.0.74.Final
-io.netty:netty-tcnative-classes:2.0.74.Final
-io.netty:netty-transport-classes-epoll:4.1.130.Final
-io.netty:netty-transport-classes-kqueue:4.1.130.Final
-io.netty:netty-transport-native-epoll:4.1.130.Final
-io.netty:netty-transport-native-kqueue:4.1.130.Final
-io.netty:netty-transport-native-unix-common:4.1.130.Final
-io.netty:netty-transport:4.1.130.Final
-io.projectreactor.netty:reactor-netty-core:1.2.13
-io.projectreactor.netty:reactor-netty-http:1.2.13
-io.projectreactor:reactor-core:3.7.14
-net.java.dev.jna:jna-platform:5.17.0
-net.java.dev.jna:jna:5.17.0
-org.reactivestreams:reactive-streams:1.0.4
+com.azure:azure-core-http-netty:1.16
+com.azure:azure-core:1.57
+com.azure:azure-identity:1.18
+com.azure:azure-json:1.5
+com.azure:azure-security-keyvault-keys:4.10
+com.azure:azure-storage-blob:12.33
+com.azure:azure-storage-common:12.32
+com.azure:azure-storage-file-datalake:12.26
+com.azure:azure-storage-internal-avro:12.18
+com.azure:azure-xml:1.2
+com.fasterxml.jackson.core:jackson-annotations:2.18
+com.fasterxml.jackson.core:jackson-core:2.18
+com.fasterxml.jackson.core:jackson-databind:2.18
+com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.18
+com.microsoft.azure:msal4j-persistence-extension:1.3
+com.microsoft.azure:msal4j:1.23
+io.netty:netty-buffer:4.2
+io.netty:netty-codec-base:4.2
+io.netty:netty-codec-compression:4.2
+io.netty:netty-codec-dns:4.2
+io.netty:netty-codec-http2:4.2
+io.netty:netty-codec-http:4.2
+io.netty:netty-codec-marshalling:4.2
+io.netty:netty-codec-protobuf:4.2
+io.netty:netty-codec-socks:4.2
+io.netty:netty-codec:4.2
+io.netty:netty-common:4.2
+io.netty:netty-handler-proxy:4.2
+io.netty:netty-handler:4.2
+io.netty:netty-resolver-dns-classes-macos:4.2
+io.netty:netty-resolver-dns-native-macos:4.2
+io.netty:netty-resolver-dns:4.2
+io.netty:netty-resolver:4.2
+io.netty:netty-tcnative-boringssl-static:2.0
+io.netty:netty-tcnative-classes:2.0
+io.netty:netty-transport-classes-epoll:4.2
+io.netty:netty-transport-classes-kqueue:4.2
+io.netty:netty-transport-native-epoll:4.2
+io.netty:netty-transport-native-kqueue:4.2
+io.netty:netty-transport-native-unix-common:4.2
+io.netty:netty-transport:4.2
+io.projectreactor.netty:reactor-netty-core:1.2
+io.projectreactor.netty:reactor-netty-http:1.2
+io.projectreactor:reactor-core:3.7
+net.java.dev.jna:jna-platform:5.17
+net.java.dev.jna:jna:5.17
+org.reactivestreams:reactive-streams:1.0
diff --git a/build.gradle b/build.gradle
index ccfc5abee0cf..2292ecb6e056 100644
--- a/build.gradle
+++ b/build.gradle
@@ -30,7 +30,7 @@ buildscript {
dependencies {
classpath 'com.gradleup.shadow:shadow-gradle-plugin:8.3.10'
classpath 'com.palantir.baseline:gradle-baseline-java:6.90.0'
- classpath 'com.diffplug.spotless:spotless-plugin-gradle:8.4.0'
+ classpath 'com.diffplug.spotless:spotless-plugin-gradle:8.5.1'
classpath 'gradle.plugin.org.inferred:gradle-processors:3.7.0'
classpath 'me.champeau.jmh:jmh-gradle-plugin:0.7.3'
classpath 'gradle.plugin.io.morethan.jmhreport:gradle-jmh-report:0.9.6'
@@ -144,7 +144,7 @@ subprojects {
revapi {
oldGroup = project.group
oldName = project.name
- oldVersion = "1.10.0"
+ oldVersion = "1.11.0"
}
tasks.register('showDeprecationRulesOnRevApiFailure') {
@@ -208,6 +208,13 @@ subprojects {
dependencySubstitution {
substitute module("org.lz4:lz4-java") using module(libs.lz4Java.get().toString()) because("Enforce lz4-java that contains CVE-2025-12183 and CVE-2025-66566 fixes")
substitute module("io.airlift:aircompressor") using module(libs.aircompressor.get().toString()) because("Enforce aircompressor that contains CVE-2025-67721 fix")
+ substitute module("org.bouncycastle:bcprov-jdk18on") using module(libs.bouncycastle.bcprov.get().toString()) because("Enforce BouncyCastle that contains CVE-2026-5598 fix")
+ }
+ eachDependency { details ->
+ if (details.requested.group == 'io.netty' && details.requested.version?.startsWith('4.1.')) {
+ details.useVersion(libs.versions.netty.buffer.get())
+ details.because("Fix Netty 4.1.x CVEs (CVE-2026-42577, CVE-2026-42579, CVE-2026-42583, CVE-2026-42584, CVE-2026-42587)")
+ }
}
}
}
@@ -1276,7 +1283,6 @@ project(':iceberg-bom') {
// the BOM references the artifacts for all Scala versions.
def sparkScalaPattern = ~"(.*)-([0-9][.][0-9]+)_([0-9][.][0-9]+)"
def sparkScalaVersions = [
- "3.4": ["2.12", "2.13"],
"3.5": ["2.12", "2.13"],
]
rootProject.allprojects.forEach {
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
index cbd372b7a4ba..b1b3847c5a7a 100644
--- a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
@@ -18,23 +18,15 @@
*/
package org.apache.iceberg;
-import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Types;
-import org.openjdk.jmh.annotations.AuxCounters;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -52,7 +44,12 @@
import org.openjdk.jmh.infra.Blackhole;
/**
- * A benchmark that measures manifest read/write performance across compression codecs.
+ * A benchmark that measures manifest read/write performance across format versions and file
+ * formats.
+ *
+ * V1-V3 only support Avro manifests. V4 supports both Avro and Parquet. The {@code
+ * versionFormat} parameter encodes valid combinations as {@code "_"} (e.g. {@code
+ * "4_PARQUET"}) so that only meaningful pairings are benchmarked.
*
* Entry counts are calibrated per column count via {@link #ENTRY_BASE}. Set to 300_000 for ~8 MB
* manifests (matching the default {@code commit.manifest.target-size-bytes}) or 15_000 for ~400 KB.
@@ -63,13 +60,25 @@
* # all combinations
* ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark
*
- * # single codec
+ * # V4-only (Avro vs Parquet)
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
+ * -PjmhParams="versionFormat=4_AVRO|4_PARQUET"
+ *
+ * # all versions, single column count
* ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
- * -PjmhParams="codec=gzip"
+ * -PjmhParams="numCols=50"
+ *
+ * # single version
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
+ * -PjmhParams="versionFormat=3_AVRO"
* }
*/
@Fork(1)
@State(Scope.Benchmark)
+// Parquet's columnar write path has a deep call graph (per-column encoders, page assembly,
+// dictionary management) that requires more warmup iterations than Avro for the JIT compiler to
+// fully optimize. Profiling shows ~650ms of JIT compilation spread across the first 3-4
+// iterations, so 6 warmups ensure measurement begins after JIT has stabilized.
@Warmup(iterations = 6)
@Measurement(iterations = 10)
@BenchmarkMode(Mode.SingleShotTime)
@@ -78,19 +87,8 @@ public class ManifestBenchmark {
static final int ENTRY_BASE = 300_000;
- private static final int FORMAT_VERSION = 4;
-
- private static final Schema SCHEMA =
- new Schema(
- Types.NestedField.required(1, "id", Types.IntegerType.get()),
- Types.NestedField.required(2, "data", Types.StringType.get()),
- Types.NestedField.required(3, "customer", Types.StringType.get()));
-
- private static final PartitionSpec SPEC =
- PartitionSpec.builderFor(SCHEMA).identity("id").identity("data").identity("customer").build();
-
- @Param({"gzip", "snappy", "zstd", "uncompressed"})
- private String codec;
+ @Param({"1_AVRO", "2_AVRO", "3_AVRO", "4_AVRO", "4_PARQUET"})
+ private String versionFormat;
@Param({"true", "false"})
private String partitioned;
@@ -98,11 +96,11 @@ public class ManifestBenchmark {
@Param({"10", "50", "100"})
private int numCols;
+ private int formatVersion;
+ private FileFormat fileFormat;
private PartitionSpec spec;
private Map specsById;
- private Map writerProperties;
private List dataFiles;
- private int numEntries;
private String writeBaseDir;
private OutputFile writeOutputFile;
@@ -112,21 +110,26 @@ public class ManifestBenchmark {
@Setup(Level.Trial)
public void setupTrial() {
- this.spec = Boolean.parseBoolean(partitioned) ? SPEC : PartitionSpec.unpartitioned();
- this.specsById = Map.of(spec.specId(), spec);
- this.writerProperties = Map.of(TableProperties.AVRO_COMPRESSION, codec);
- // ENTRY_BASE / cols: empirically calibrated — 300_000 → ~8 MB, 15_000 → ~400 KB manifests
- this.numEntries = ENTRY_BASE / numCols;
- this.dataFiles = generateDataFiles();
+ String[] parts = versionFormat.split("_", 2);
+ this.formatVersion = Integer.parseInt(parts[0]);
+ this.fileFormat = FileFormat.fromString(parts[1]);
+ this.spec =
+ Boolean.parseBoolean(partitioned)
+ ? ManifestBenchmarkUtil.SPEC
+ : PartitionSpec.unpartitioned();
+ this.specsById = ImmutableMap.of(spec.specId(), spec);
+ int numEntries = ManifestBenchmarkUtil.entriesForColumnCount(ENTRY_BASE, numCols);
+ this.dataFiles = ManifestBenchmarkUtil.generateDataFiles(spec, numEntries, numCols);
setupReadManifest();
}
@Setup(Level.Invocation)
public void setupWriteInvocation() throws IOException {
- this.writeBaseDir = Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
+ this.writeBaseDir =
+ java.nio.file.Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
this.writeOutputFile =
- org.apache.iceberg.Files.localOutput(
- String.format(Locale.ROOT, "%s/manifest.avro", writeBaseDir));
+ Files.localOutput(
+ String.format(Locale.ROOT, "%s/%s", writeBaseDir, fileFormat.addExtension("manifest")));
for (DataFile file : dataFiles) {
file.path();
@@ -137,7 +140,7 @@ public void setupWriteInvocation() throws IOException {
@TearDown(Level.Trial)
public void tearDownTrial() {
- cleanDir(readBaseDir);
+ ManifestBenchmarkUtil.cleanDir(readBaseDir);
readBaseDir = null;
readManifest = null;
dataFiles = null;
@@ -145,28 +148,15 @@ public void tearDownTrial() {
@TearDown(Level.Invocation)
public void tearDownInvocation() {
- cleanDir(writeBaseDir);
+ ManifestBenchmarkUtil.cleanDir(writeBaseDir);
writeBaseDir = null;
writeOutputFile = null;
}
- @AuxCounters(AuxCounters.Type.EVENTS)
- @State(Scope.Thread)
- @SuppressWarnings("checkstyle:VisibilityModifier")
- public static class FileSizeCounters {
- public double manifestSizeMB;
-
- @Setup(Level.Invocation)
- public void reset() {
- manifestSizeMB = 0;
- }
- }
-
@Benchmark
@Threads(1)
- public ManifestFile writeManifest(FileSizeCounters counters) throws IOException {
- ManifestWriter writer =
- ManifestFiles.write(FORMAT_VERSION, spec, writeOutputFile, 1L, writerProperties);
+ public ManifestFile writeManifest() throws IOException {
+ ManifestWriter writer = ManifestFiles.write(formatVersion, spec, writeOutputFile, 1L);
try (ManifestWriter w = writer) {
for (DataFile file : dataFiles) {
@@ -174,9 +164,7 @@ public ManifestFile writeManifest(FileSizeCounters counters) throws IOException
}
}
- ManifestFile manifest = writer.toManifestFile();
- counters.manifestSizeMB = manifest.length() / (1024.0 * 1024.0);
- return manifest;
+ return writer.toManifestFile();
}
@Benchmark
@@ -193,17 +181,17 @@ public void readManifest(Blackhole blackhole) throws IOException {
private void setupReadManifest() {
try {
- this.readBaseDir = Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
+ this.readBaseDir =
+ java.nio.file.Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
OutputFile manifestFile =
- org.apache.iceberg.Files.localOutput(
- String.format(Locale.ROOT, "%s/manifest.avro", readBaseDir));
+ Files.localOutput(
+ String.format(Locale.ROOT, "%s/%s", readBaseDir, fileFormat.addExtension("manifest")));
- ManifestWriter writer =
- ManifestFiles.write(FORMAT_VERSION, spec, manifestFile, 1L, writerProperties);
+ ManifestWriter writer = ManifestFiles.write(formatVersion, spec, manifestFile, 1L);
try (ManifestWriter w = writer) {
for (DataFile file : dataFiles) {
@@ -215,65 +203,4 @@ private void setupReadManifest() {
this.readManifest = writer.toManifestFile();
}
-
- private List generateDataFiles() {
- Random random = new Random(42);
- List files = Lists.newArrayListWithCapacity(numEntries);
- for (int i = 0; i < numEntries; i++) {
- DataFiles.Builder builder =
- DataFiles.builder(spec)
- .withFormat(FileFormat.PARQUET)
- .withPath(String.format(Locale.ROOT, "/path/to/data-%d.parquet", i))
- .withFileSizeInBytes(1024 + i)
- .withRecordCount(1000 + i)
- .withMetrics(randomMetrics(random, numCols));
-
- if (!spec.isUnpartitioned()) {
- builder.withPartitionPath(
- String.format(
- Locale.ROOT, "id=%d/data=val-%d/customer=cust-%d", i % 100, i % 50, i % 200));
- }
-
- files.add(builder.build());
- }
-
- return files;
- }
-
- static Metrics randomMetrics(Random random, int cols) {
- long rowCount = 100_000L + random.nextInt(1000);
- Map columnSizes = Maps.newHashMap();
- Map valueCounts = Maps.newHashMap();
- Map nullValueCounts = Maps.newHashMap();
- Map nanValueCounts = Maps.newHashMap();
- Map lowerBounds = Maps.newHashMap();
- Map upperBounds = Maps.newHashMap();
- for (int i = 0; i < cols; i++) {
- columnSizes.put(i, 1_000_000L + random.nextInt(100_000));
- valueCounts.put(i, 100_000L + random.nextInt(100));
- nullValueCounts.put(i, (long) random.nextInt(5));
- nanValueCounts.put(i, (long) random.nextInt(5));
- byte[] lower = new byte[8];
- random.nextBytes(lower);
- lowerBounds.put(i, ByteBuffer.wrap(lower));
- byte[] upper = new byte[8];
- random.nextBytes(upper);
- upperBounds.put(i, ByteBuffer.wrap(upper));
- }
-
- return new Metrics(
- rowCount,
- columnSizes,
- valueCounts,
- nullValueCounts,
- nanValueCounts,
- lowerBounds,
- upperBounds);
- }
-
- private static void cleanDir(String dir) {
- if (dir != null) {
- FileUtils.deleteQuietly(new File(dir));
- }
- }
}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmarkUtil.java b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmarkUtil.java
new file mode 100644
index 000000000000..d37c48daaba3
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmarkUtil.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Shared constants and stateless helpers for {@link ManifestBenchmark} and {@link
+ * ManifestCompressionBenchmark}.
+ */
+final class ManifestBenchmarkUtil {
+
+ static final Schema SCHEMA =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.required(3, "customer", Types.StringType.get()));
+
+ static final PartitionSpec SPEC =
+ PartitionSpec.builderFor(SCHEMA).identity("id").identity("data").identity("customer").build();
+
+ private ManifestBenchmarkUtil() {}
+
+ /**
+ * Returns the number of manifest entries for the given column count. The result is {@code
+ * entryBase / cols}.
+ *
+ * The linear ratio was determined empirically by writing manifests at various column counts
+ * and measuring the resulting file sizes. An {@code entryBase} of 300,000 produces ~8 MB
+ * manifests (matching the default {@code commit.manifest.target-size-bytes}); 15,000 produces
+ * ~400 KB.
+ */
+ static int entriesForColumnCount(int entryBase, int cols) {
+ return entryBase / cols;
+ }
+
+ static List generateDataFiles(PartitionSpec spec, int numEntries, int numCols) {
+ Random random = new Random(42);
+ List files = Lists.newArrayListWithCapacity(numEntries);
+ for (int i = 0; i < numEntries; i++) {
+ DataFiles.Builder builder =
+ DataFiles.builder(spec)
+ .withFormat(FileFormat.PARQUET)
+ .withPath(String.format(Locale.ROOT, "/path/to/data-%d.parquet", i))
+ .withFileSizeInBytes(1024 + i)
+ .withRecordCount(1000 + i)
+ .withMetrics(randomMetrics(random, numCols));
+
+ if (spec.isPartitioned()) {
+ builder.withPartitionPath(
+ String.format(
+ Locale.ROOT, "id=%d/data=val-%d/customer=cust-%d", i % 100, i % 50, i % 200));
+ }
+
+ files.add(builder.build());
+ }
+ return files;
+ }
+
+ static Metrics randomMetrics(Random random, int cols) {
+ long rowCount = 100_000L + random.nextInt(1000);
+ Map columnSizes = Maps.newHashMap();
+ Map valueCounts = Maps.newHashMap();
+ Map nullValueCounts = Maps.newHashMap();
+ Map nanValueCounts = Maps.newHashMap();
+ Map lowerBounds = Maps.newHashMap();
+ Map upperBounds = Maps.newHashMap();
+ for (int i = 0; i < cols; i++) {
+ columnSizes.put(i, 1_000_000L + random.nextInt(100_000));
+ valueCounts.put(i, 100_000L + random.nextInt(100));
+ nullValueCounts.put(i, (long) random.nextInt(5));
+ nanValueCounts.put(i, (long) random.nextInt(5));
+ byte[] lower = new byte[8];
+ random.nextBytes(lower);
+ lowerBounds.put(i, ByteBuffer.wrap(lower));
+ byte[] upper = new byte[8];
+ random.nextBytes(upper);
+ upperBounds.put(i, ByteBuffer.wrap(upper));
+ }
+
+ return new Metrics(
+ rowCount,
+ columnSizes,
+ valueCounts,
+ nullValueCounts,
+ nanValueCounts,
+ lowerBounds,
+ upperBounds);
+ }
+
+ static void cleanDir(String dir) {
+ if (dir != null) {
+ FileUtils.deleteQuietly(new java.io.File(dir));
+ }
+ }
+}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestCompressionBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ManifestCompressionBenchmark.java
new file mode 100644
index 000000000000..bf09ae18f91c
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestCompressionBenchmark.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.OutputFile;
+import org.openjdk.jmh.annotations.AuxCounters;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * A benchmark that measures manifest read/write performance across compression codecs.
+ *
+ * Entry counts are calibrated per column count via {@link #ENTRY_BASE}. Set to 300_000 for ~8 MB
+ * manifests (matching the default {@code commit.manifest.target-size-bytes}) or 15_000 for ~400 KB.
+ *
+ *
To run this benchmark:
+ *
+ *
{@code
+ * # all combinations
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestCompressionBenchmark
+ *
+ * # single codec
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestCompressionBenchmark \
+ * -PjmhParams="codec=gzip"
+ * }
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 6)
+@Measurement(iterations = 10)
+@BenchmarkMode(Mode.SingleShotTime)
+@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
+public class ManifestCompressionBenchmark {
+
+ static final int ENTRY_BASE = 300_000;
+
+ private static final int FORMAT_VERSION = 4;
+
+ @Param({"gzip", "snappy", "zstd", "uncompressed"})
+ private String codec;
+
+ @Param({"true", "false"})
+ private String partitioned;
+
+ @Param({"10", "50", "100"})
+ private int numCols;
+
+ private PartitionSpec spec;
+ private Map specsById;
+ private Map writerProperties;
+ private List dataFiles;
+
+ private String writeBaseDir;
+ private OutputFile writeOutputFile;
+
+ private String readBaseDir;
+ private ManifestFile readManifest;
+
+ @Setup(Level.Trial)
+ public void setupTrial() {
+ this.spec =
+ Boolean.parseBoolean(partitioned)
+ ? ManifestBenchmarkUtil.SPEC
+ : PartitionSpec.unpartitioned();
+ this.specsById = Map.of(spec.specId(), spec);
+ this.writerProperties = Map.of(TableProperties.AVRO_COMPRESSION, codec);
+ int numEntries = ManifestBenchmarkUtil.entriesForColumnCount(ENTRY_BASE, numCols);
+ this.dataFiles = ManifestBenchmarkUtil.generateDataFiles(spec, numEntries, numCols);
+ setupReadManifest();
+ }
+
+ @Setup(Level.Invocation)
+ public void setupWriteInvocation() throws IOException {
+ this.writeBaseDir =
+ java.nio.file.Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
+ this.writeOutputFile =
+ Files.localOutput(String.format(Locale.ROOT, "%s/manifest.avro", writeBaseDir));
+
+ for (DataFile file : dataFiles) {
+ file.path();
+ file.fileSizeInBytes();
+ file.recordCount();
+ }
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDownTrial() {
+ ManifestBenchmarkUtil.cleanDir(readBaseDir);
+ readBaseDir = null;
+ readManifest = null;
+ dataFiles = null;
+ }
+
+ @TearDown(Level.Invocation)
+ public void tearDownInvocation() {
+ ManifestBenchmarkUtil.cleanDir(writeBaseDir);
+ writeBaseDir = null;
+ writeOutputFile = null;
+ }
+
+ @AuxCounters(AuxCounters.Type.EVENTS)
+ @State(Scope.Thread)
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ public static class FileSizeCounters {
+ public double manifestSizeMB;
+
+ @Setup(Level.Invocation)
+ public void reset() {
+ manifestSizeMB = 0;
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public ManifestFile writeManifest(FileSizeCounters counters) throws IOException {
+ ManifestWriter writer =
+ ManifestFiles.write(FORMAT_VERSION, spec, writeOutputFile, 1L, writerProperties);
+
+ try (ManifestWriter w = writer) {
+ for (DataFile file : dataFiles) {
+ w.add(file);
+ }
+ }
+
+ ManifestFile manifest = writer.toManifestFile();
+ counters.manifestSizeMB = manifest.length() / (1024.0 * 1024.0);
+ return manifest;
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readManifest(Blackhole blackhole) throws IOException {
+ TestTables.LocalFileIO fileIO = new TestTables.LocalFileIO();
+ try (CloseableIterator it =
+ ManifestFiles.read(readManifest, fileIO, specsById).iterator()) {
+ while (it.hasNext()) {
+ blackhole.consume(it.next());
+ }
+ }
+ }
+
+ private void setupReadManifest() {
+ try {
+ this.readBaseDir =
+ java.nio.file.Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ OutputFile manifestFile =
+ Files.localOutput(String.format(Locale.ROOT, "%s/manifest.avro", readBaseDir));
+
+ ManifestWriter writer =
+ ManifestFiles.write(FORMAT_VERSION, spec, manifestFile, 1L, writerProperties);
+
+ try (ManifestWriter w = writer) {
+ for (DataFile file : dataFiles) {
+ w.add(file);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ this.readManifest = writer.toManifestFile();
+ }
+}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
deleted file mode 100644
index 588b5df1ba97..000000000000
--- a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-import org.apache.iceberg.encryption.PlaintextEncryptionManager;
-import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.annotations.Threads;
-import org.openjdk.jmh.annotations.Timeout;
-
-@Fork(1)
-@State(Scope.Benchmark)
-@Measurement(iterations = 5)
-@BenchmarkMode(Mode.SingleShotTime)
-@Timeout(time = 1000, timeUnit = TimeUnit.HOURS)
-public class ManifestReadBenchmark {
-
- private static final int NUM_FILES = 10;
- private static final int NUM_ROWS = 100000;
- private static final int NUM_COLS = 10;
-
- private String baseDir;
- private String manifestListFile;
-
- @Setup
- public void before() {
- baseDir =
- Paths.get(new File(System.getProperty("java.io.tmpdir")).getAbsolutePath()).toString();
- manifestListFile = String.format("%s/%s.avro", baseDir, UUID.randomUUID());
-
- Random random = new Random(System.currentTimeMillis());
-
- try (ManifestListWriter listWriter =
- ManifestLists.write(
- 1,
- org.apache.iceberg.Files.localOutput(manifestListFile),
- PlaintextEncryptionManager.instance(),
- 0,
- 1L,
- 0,
- 0L)) {
- for (int i = 0; i < NUM_FILES; i++) {
- OutputFile manifestFile =
- org.apache.iceberg.Files.localOutput(
- String.format("%s/%s.avro", baseDir, UUID.randomUUID()));
-
- ManifestWriter writer =
- ManifestFiles.write(1, PartitionSpec.unpartitioned(), manifestFile, 1L);
- try (ManifestWriter finalWriter = writer) {
- for (int j = 0; j < NUM_ROWS; j++) {
- DataFile dataFile =
- DataFiles.builder(PartitionSpec.unpartitioned())
- .withFormat(FileFormat.PARQUET)
- .withPath(String.format("/path/to/data-%s-%s.parquet", i, j))
- .withFileSizeInBytes(j)
- .withRecordCount(j)
- .withMetrics(randomMetrics(random))
- .build();
- finalWriter.add(dataFile);
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
-
- listWriter.add(writer.toManifestFile());
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
- @TearDown
- public void after() throws IOException {
- if (baseDir != null) {
- try (Stream walk = Files.walk(Paths.get(baseDir))) {
- walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
- }
- baseDir = null;
- }
-
- manifestListFile = null;
- }
-
- @Benchmark
- @Threads(1)
- public void readManifestFile() throws IOException {
- List manifests =
- ManifestLists.read(org.apache.iceberg.Files.localInput(manifestListFile));
- TestTables.LocalFileIO fileIO = new TestTables.LocalFileIO();
- Map specs =
- ImmutableMap.of(PartitionSpec.unpartitioned().specId(), PartitionSpec.unpartitioned());
- for (ManifestFile manifestFile : manifests) {
- ManifestReader reader = ManifestFiles.read(manifestFile, fileIO, specs);
- try (CloseableIterator it = reader.iterator()) {
- while (it.hasNext()) {
- it.next().recordCount();
- }
- }
- }
- }
-
- private Metrics randomMetrics(Random random) {
- long rowCount = 100000L + random.nextInt(1000);
- Map columnSizes = Maps.newHashMap();
- Map valueCounts = Maps.newHashMap();
- Map nullValueCounts = Maps.newHashMap();
- Map nanValueCounts = Maps.newHashMap();
- Map lowerBounds = Maps.newHashMap();
- Map upperBounds = Maps.newHashMap();
- for (int i = 0; i < NUM_COLS; i++) {
- columnSizes.put(i, 1000000L + random.nextInt(100000));
- valueCounts.put(i, 100000L + random.nextInt(100));
- nullValueCounts.put(i, (long) random.nextInt(5));
- nanValueCounts.put(i, (long) random.nextInt(5));
- byte[] lower = new byte[8];
- random.nextBytes(lower);
- lowerBounds.put(i, ByteBuffer.wrap(lower));
- byte[] upper = new byte[8];
- random.nextBytes(upper);
- upperBounds.put(i, ByteBuffer.wrap(upper));
- }
-
- return new Metrics(
- rowCount,
- columnSizes,
- valueCounts,
- nullValueCounts,
- nanValueCounts,
- lowerBounds,
- upperBounds);
- }
-}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java
deleted file mode 100644
index b0dab63dea06..000000000000
--- a/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.iceberg.encryption.PlaintextEncryptionManager;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.annotations.Threads;
-import org.openjdk.jmh.annotations.Timeout;
-
-/**
- * A benchmark that evaluates the performance of writing manifest files
- *
- * To run this benchmark:
- * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestWriteBenchmark
- *
- */
-@Fork(1)
-@State(Scope.Benchmark)
-@Measurement(iterations = 5)
-@BenchmarkMode(Mode.SingleShotTime)
-@Timeout(time = 5, timeUnit = TimeUnit.MINUTES)
-public class ManifestWriteBenchmark {
-
- private static final int NUM_FILES = 10;
- private static final int NUM_ROWS = 100000;
- private static final int NUM_COLS = 100;
-
- private String baseDir;
- private String manifestListFile;
-
- private Metrics metrics;
-
- @Setup
- public void before() {
- Random random = new Random(System.currentTimeMillis());
- // Pre-create the metrics to avoid doing this in the benchmark itself
- metrics = randomMetrics(random);
- }
-
- @TearDown
- public void after() {
- if (baseDir != null) {
- FileUtils.deleteQuietly(new File(baseDir));
- baseDir = null;
- }
-
- manifestListFile = null;
- }
-
- @State(Scope.Benchmark)
- public static class BenchmarkState {
- @Param({"1", "2"})
- private int formatVersion;
-
- public int getFormatVersion() {
- return formatVersion;
- }
- }
-
- @Benchmark
- @Threads(1)
- public void writeManifestFile(BenchmarkState state) throws IOException {
- this.baseDir =
- java.nio.file.Files.createTempDirectory("benchmark-").toAbsolutePath().toString();
- this.manifestListFile = String.format("%s/%s.avro", baseDir, UUID.randomUUID());
-
- try (ManifestListWriter listWriter =
- ManifestLists.write(
- state.getFormatVersion(),
- org.apache.iceberg.Files.localOutput(manifestListFile),
- PlaintextEncryptionManager.instance(),
- 0,
- 1L,
- 0,
- 0L)) {
- for (int i = 0; i < NUM_FILES; i++) {
- OutputFile manifestFile =
- org.apache.iceberg.Files.localOutput(
- String.format("%s/%s.avro", baseDir, UUID.randomUUID()));
-
- ManifestWriter writer =
- ManifestFiles.write(
- state.formatVersion, PartitionSpec.unpartitioned(), manifestFile, 1L);
- try (ManifestWriter finalWriter = writer) {
- for (int j = 0; j < NUM_ROWS; j++) {
- DataFile dataFile =
- DataFiles.builder(PartitionSpec.unpartitioned())
- .withFormat(FileFormat.PARQUET)
- .withPath(String.format("/path/to/data-%s-%s.parquet", i, j))
- .withFileSizeInBytes(j)
- .withRecordCount(j)
- .withMetrics(metrics)
- .build();
- finalWriter.add(dataFile);
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
-
- listWriter.add(writer.toManifestFile());
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
- private Metrics randomMetrics(Random random) {
- long rowCount = 100000L + random.nextInt(1000);
- Map columnSizes = Maps.newHashMap();
- Map valueCounts = Maps.newHashMap();
- Map nullValueCounts = Maps.newHashMap();
- Map nanValueCounts = Maps.newHashMap();
- Map lowerBounds = Maps.newHashMap();
- Map upperBounds = Maps.newHashMap();
- for (int i = 0; i < NUM_COLS; i++) {
- columnSizes.put(i, 1000000L + random.nextInt(100000));
- valueCounts.put(i, 100000L + random.nextInt(100));
- nullValueCounts.put(i, (long) random.nextInt(5));
- nanValueCounts.put(i, (long) random.nextInt(5));
- byte[] lower = new byte[8];
- random.nextBytes(lower);
- lowerBounds.put(i, ByteBuffer.wrap(lower));
- byte[] upper = new byte[8];
- random.nextBytes(upper);
- upperBounds.put(i, ByteBuffer.wrap(upper));
- }
-
- return new Metrics(
- rowCount,
- columnSizes,
- valueCounts,
- nullValueCounts,
- nanValueCounts,
- lowerBounds,
- upperBounds);
- }
-}
diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java
index 3c31c50f099f..7147ba58787b 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFile.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFile.java
@@ -32,6 +32,7 @@
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.SupportsIndexProjection;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
@@ -329,7 +330,11 @@ protected void internalSet(int pos, T value) {
this.partitionSpecId = (value != null) ? (Integer) value : -1;
return;
case 4:
- this.partitionData = (PartitionData) value;
+ // Preserve the constructor-initialized partitionData when the reader returns null
+ // (e.g., v4 Parquet manifests for unpartitioned tables omit the partition field).
+ if (value != null) {
+ this.partitionData = (PartitionData) value;
+ }
return;
case 5:
this.recordCount = (Long) value;
@@ -581,9 +586,37 @@ private static Map copyMap(Map map, Set keys) {
private static Map copyByteBufferMap(
Map map, Set keys) {
- return SerializableByteBufferMap.wrap(copyMap(map, keys));
+ if (map == null) {
+ return null;
+ }
+
+ return SerializableByteBufferMap.wrap(deepCopyByteBufferMap(map, keys));
+ }
+
+ // Required as long as we have Map in the API since Parquet reuses buffers.
+ private static Map deepCopyByteBufferMap(
+ Map map, Set keys) {
+ Map deepCopy = Maps.newHashMapWithExpectedSize(map.size());
+ for (Map.Entry entry : map.entrySet()) {
+ if (keys == null || keys.contains(entry.getKey())) {
+ ByteBuffer buf = entry.getValue();
+ if (buf != null) {
+ ByteBuffer copy = ByteBuffer.allocate(buf.remaining());
+ copy.put(buf.duplicate());
+ copy.flip();
+ deepCopy.put(entry.getKey(), copy);
+ } else {
+ deepCopy.put(entry.getKey(), null);
+ }
+ }
+ }
+
+ return deepCopy;
}
+ // Returns an unmodifiable view of the map. The SerializableMap check is needed because
+ // internal maps may be wrapped for serialization after being populated by a format reader
+ // with container reuse enabled, and immutableMap() provides a stable snapshot.
private static Map toReadableMap(Map map) {
if (map == null) {
return null;
@@ -594,6 +627,10 @@ private static Map toReadableMap(Map map) {
}
}
+ // Separate from toReadableMap because SerializableByteBufferMap is its own wrapper type
+ // (not a SerializableMap subclass) to handle ByteBuffer-specific serialization. ByteBuffer
+ // values are mutable and can be overwritten by Parquet container reuse, so callers that
+ // retain references must use copyByteBufferMap to get independent copies.
private static Map toReadableByteBufferMap(Map map) {
if (map == null) {
return null;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index ffeff9c99145..5ac55f0cf41f 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -151,6 +151,14 @@ public static ManifestReader read(ManifestFile manifest, FileIO io) {
*/
public static ManifestReader read(
ManifestFile manifest, FileIO io, Map specsById) {
+ return read(manifest, io, specsById, true);
+ }
+
+ static ManifestReader read(
+ ManifestFile manifest,
+ FileIO io,
+ Map specsById,
+ boolean isCommitted) {
Preconditions.checkArgument(
manifest.content() == ManifestContent.DATA,
"Cannot read a delete manifest with a ManifestReader: %s",
@@ -163,6 +171,7 @@ public static ManifestReader read(
specsById,
inheritableMetadata,
manifest.firstRowId(),
+ isCommitted,
FileType.DATA_FILES);
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestLists.java b/core/src/main/java/org/apache/iceberg/ManifestLists.java
index 5d7713ad06c6..fc7774ffcb44 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestLists.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestLists.java
@@ -40,7 +40,7 @@ static List read(InputFile manifestList) {
.project(ManifestFile.schema())
.build()) {
- return Lists.newLinkedList(files);
+ return Lists.newArrayList(files);
} catch (IOException e) {
throw new RuntimeIOException(
diff --git a/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
index 410edcc06859..0aec0ac69a9f 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java
@@ -72,6 +72,10 @@ abstract class ManifestMergeManager> {
protected abstract ManifestReader newManifestReader(ManifestFile manifest);
+ protected ManifestReader newManifestReader(ManifestFile manifest, boolean isCommitted) {
+ return newManifestReader(manifest);
+ }
+
Iterable mergeManifests(Iterable manifests) {
Iterator manifestIter = manifests.iterator();
if (!mergeEnabled || !manifestIter.hasNext()) {
@@ -192,7 +196,9 @@ private ManifestFile createManifest(int specId, List bin) {
boolean threw = true;
try {
for (ManifestFile manifest : bin) {
- try (ManifestReader reader = newManifestReader(manifest)) {
+ boolean isCommitted =
+ manifest.snapshotId() != null && snapshotId() != manifest.snapshotId();
+ try (ManifestReader reader = newManifestReader(manifest, isCommitted)) {
for (ManifestEntry entry : reader.entries()) {
if (entry.status() == Status.DELETED) {
// suppress deletes from previous snapshots. only files deleted by this snapshot
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 09bbe8b0cc6b..dc34836b6c56 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -60,6 +60,13 @@ public class ManifestReader> extends CloseableGroup
static final ImmutableList ALL_COLUMNS = ImmutableList.of("*");
+ private static final Types.NestedField UNPARTITIONED_PARTITION_FIELD =
+ Types.NestedField.optional(
+ DataFile.PARTITION_ID,
+ DataFile.PARTITION_NAME,
+ Types.StructType.of(),
+ DataFile.PARTITION_DOC);
+
private static final Set STATS_COLUMNS =
ImmutableSet.of(
"value_counts",
@@ -92,6 +99,7 @@ private Class extends StructLike> fileClass() {
private final InputFile file;
private final InheritableMetadata inheritableMetadata;
private final Long firstRowId;
+ private final boolean isCommitted;
private final FileType content;
private final PartitionSpec spec;
private final Schema fileSchema;
@@ -125,12 +133,24 @@ protected ManifestReader(
InheritableMetadata inheritableMetadata,
Long firstRowId,
FileType content) {
+ this(file, specId, specsById, inheritableMetadata, firstRowId, true, content);
+ }
+
+ protected ManifestReader(
+ InputFile file,
+ int specId,
+ Map specsById,
+ InheritableMetadata inheritableMetadata,
+ Long firstRowId,
+ boolean isCommitted,
+ FileType content) {
Preconditions.checkArgument(
firstRowId == null || content == FileType.DATA_FILES,
"First row ID is not valid for delete manifests");
this.file = file;
this.inheritableMetadata = inheritableMetadata;
this.firstRowId = firstRowId;
+ this.isCommitted = isCommitted;
this.content = content;
if (specsById != null) {
@@ -160,6 +180,12 @@ private > PartitionSpec readPartitionSpec(InputFile inp
}
private static > Map readMetadata(InputFile inputFile) {
+ FileFormat manifestFormat = FileFormat.fromFileName(inputFile.location());
+ Preconditions.checkArgument(
+ manifestFormat == FileFormat.AVRO,
+ "Reading manifest metadata is only supported for Avro manifests: %s",
+ inputFile.location());
+
Map metadata;
try {
try (CloseableIterable> headerReader =
@@ -285,8 +311,22 @@ private CloseableIterable> open(Schema projection) {
Preconditions.checkArgument(
format != null, "Unable to determine format of manifest: %s", file.location());
+ boolean unpartitioned = spec.rawPartitionType().fields().isEmpty();
+
+ // V4+ manifests omit the partition field when unpartitioned (Parquet cannot represent
+ // empty structs, and the field is meaningless regardless of format). Mark it optional so
+ // the reader returns null for the missing field instead of throwing. The field must stay
+ // in the projection to preserve positional access for callers like StructProjection.
+ // For older versions where the empty struct is present, making it optional is harmless.
List fields = Lists.newArrayList();
- fields.addAll(projection.asStruct().fields());
+ for (Types.NestedField field : projection.asStruct().fields()) {
+ if (unpartitioned && field.fieldId() == DataFile.PARTITION_ID) {
+ fields.add(UNPARTITIONED_PARTITION_FIELD);
+ } else {
+ fields.add(field);
+ }
+ }
+
if (projection.findField(DataFile.RECORD_COUNT.fieldId()) == null) {
fields.add(DataFile.RECORD_COUNT);
}
@@ -308,7 +348,7 @@ private CloseableIterable> open(Schema projection) {
CloseableIterable> withMetadata =
CloseableIterable.transform(reader, inheritableMetadata::apply);
- return CloseableIterable.transform(withMetadata, idAssigner(firstRowId));
+ return CloseableIterable.transform(withMetadata, idAssigner(firstRowId, isCommitted));
}
CloseableIterable> liveEntries() {
@@ -398,7 +438,7 @@ static List withStatsColumns(Collection columns) {
}
private static > Function, ManifestEntry> idAssigner(
- Long firstRowId) {
+ Long firstRowId, boolean isCommitted) {
if (firstRowId != null) {
return new Function<>() {
private long nextRowId = firstRowId;
@@ -416,8 +456,13 @@ public ManifestEntry apply(ManifestEntry entry) {
return entry;
}
};
+ } else if (!isCommitted) {
+ // Preserve firstRowId for entries in uncommitted manifests, including EXISTING entries that
+ // may be merged later
+ return Function.identity();
} else {
- // data file's first_row_id is null when the manifest's first_row_id is null
+ // committed manifest with null manifest-level firstRowId (pre-v3 upgrade path)
+ // defensively set the first row ID for every entry to be null
return entry -> {
if (entry.file() instanceof BaseFile) {
((BaseFile>) entry.file()).setFirstRowId(null);
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 7d85f991b080..321bcd89d8b1 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -24,6 +24,7 @@
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata;
+import org.apache.iceberg.encryption.NativeEncryptionOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
@@ -40,6 +41,7 @@ public abstract class ManifestWriter> implements FileAp
// this is replaced when writing a manifest list by the ManifestFile wrapper
static final long UNASSIGNED_SEQ = -1L;
+ private final FileFormat format;
private final OutputFile file;
private final EncryptionKeyMetadata keyMetadata;
private final int specId;
@@ -65,7 +67,8 @@ private ManifestWriter(
Long snapshotId,
Long firstRowId,
Map writerProperties) {
- this.file = file.encryptingOutputFile();
+ this.format = FileFormat.fromFileName(file.encryptingOutputFile().location());
+ this.file = outputFile(file);
this.specId = spec.specId();
this.writerProperties = writerProperties;
this.writer = newAppender(spec, this.file);
@@ -82,6 +85,21 @@ private ManifestWriter(
protected abstract FileAppender> newAppender(
PartitionSpec spec, OutputFile outputFile);
+ private OutputFile outputFile(EncryptedOutputFile encryptedFile) {
+ // Casting to NativeEncryptionOutputFile actually makes the file rely on native encryption
+ // rather than whole-file encryption.
+ if (format == FileFormat.PARQUET
+ && encryptedFile instanceof NativeEncryptionOutputFile nativeFile) {
+ return nativeFile;
+ }
+
+ return encryptedFile.encryptingOutputFile();
+ }
+
+ protected FileFormat format() {
+ return format;
+ }
+
protected Map writerProperties() {
return writerProperties;
}
@@ -206,16 +224,7 @@ public long length() {
public ManifestFile toManifestFile() {
Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed");
- ByteBuffer keyMetadataBuffer;
- if (keyMetadata instanceof NativeEncryptionKeyMetadata) {
- // File length is required by AES GCM Stream encryption, to prevent file truncation attacks
- keyMetadataBuffer =
- ((NativeEncryptionKeyMetadata) keyMetadata).copyWithLength(length()).buffer();
- } else if (keyMetadata != null) {
- keyMetadataBuffer = keyMetadata.buffer();
- } else {
- keyMetadataBuffer = null;
- }
+ ByteBuffer keyMetadataBuffer = keyMetadataBuffer();
// if the minSequenceNumber is null, then no manifests with a sequence number have been written,
// so the min data sequence number is the one that will be assigned when this is committed.
@@ -240,6 +249,19 @@ public ManifestFile toManifestFile() {
firstRowId);
}
+ private ByteBuffer keyMetadataBuffer() {
+ if (keyMetadata instanceof NativeEncryptionKeyMetadata nativeKeyMetadata
+ && format == FileFormat.AVRO) {
+ // Whole-file encryption needs the file length embedded for GCM truncation protection.
+ // Formats with native encryption (like Parquet) handle this directly and don't need it.
+ return nativeKeyMetadata.copyWithLength(length()).buffer();
+ } else if (keyMetadata != null) {
+ return keyMetadata.buffer();
+ }
+
+ return null;
+ }
+
@Override
public void close() throws IOException {
this.closed = true;
@@ -256,7 +278,7 @@ static class V4Writer extends ManifestWriter {
Long firstRowId,
Map writerProperties) {
super(spec, file, snapshotId, firstRowId, writerProperties);
- this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
+ this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId, spec.partitionType());
}
@Override
@@ -269,7 +291,7 @@ protected FileAppender> newAppender(
PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
try {
- return InternalData.write(FileFormat.AVRO, file)
+ return InternalData.write(format(), file)
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
@@ -296,7 +318,7 @@ static class V4DeleteWriter extends ManifestWriter {
Long snapshotId,
Map writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
- this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
+ this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId, spec.partitionType());
}
@Override
@@ -309,7 +331,7 @@ protected FileAppender> newAppender(
PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
try {
- return InternalData.write(FileFormat.AVRO, file)
+ return InternalData.write(format(), file)
.schema(manifestSchema)
.named("manifest_entry")
.meta("schema", SchemaParser.toJson(spec.schema()))
@@ -342,6 +364,8 @@ static class V3Writer extends ManifestWriter {
Long firstRowId,
Map writerProperties) {
super(spec, file, snapshotId, firstRowId, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V3 manifests must use Avro, but got: %s", format());
this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -382,6 +406,8 @@ static class V3DeleteWriter extends ManifestWriter {
Long snapshotId,
Map writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V3 manifests must use Avro, but got: %s", format());
this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -427,6 +453,8 @@ static class V2Writer extends ManifestWriter {
Long snapshotId,
Map writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V2 manifests must use Avro, but got: %s", format());
this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -467,6 +495,8 @@ static class V2DeleteWriter extends ManifestWriter {
Long snapshotId,
Map writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V2 manifests must use Avro, but got: %s", format());
this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -512,6 +542,8 @@ static class V1Writer extends ManifestWriter {
Long snapshotId,
Map writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V1 manifests must use Avro, but got: %s", format());
this.entryWrapper = new V1Metadata.ManifestEntryWrapper();
}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index e072382543b7..1a70b4f90b8f 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -1250,7 +1250,13 @@ protected ManifestWriter newManifestWriter(PartitionSpec manifestSpec)
@Override
protected ManifestReader newManifestReader(ManifestFile manifest) {
- return MergingSnapshotProducer.this.newManifestReader(manifest);
+ return newManifestReader(manifest, true);
+ }
+
+ @Override
+ protected ManifestReader newManifestReader(
+ ManifestFile manifest, boolean isCommitted) {
+ return ManifestFiles.read(manifest, ops().io(), ops().current().specsById(), isCommitted);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java
deleted file mode 100644
index e8a4e18916bc..000000000000
--- a/core/src/main/java/org/apache/iceberg/PartitionStats.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg;
-
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-
-/**
- * Class to hold partition statistics values.
- *
- * @deprecated will be removed in 1.12.0. Use {@link BasePartitionStatistics instead}
- */
-@Deprecated
-public class PartitionStats implements StructLike {
-
- private static final int STATS_COUNT = 13;
-
- private StructLike partition;
- private int specId;
- private long dataRecordCount;
- private int dataFileCount;
- private long totalDataFileSizeInBytes;
- private long positionDeleteRecordCount; // also includes dv record count as per spec
- private int positionDeleteFileCount;
- private long equalityDeleteRecordCount;
- private int equalityDeleteFileCount;
- private Long totalRecordCount; // null by default
- private Long lastUpdatedAt; // null by default
- private Long lastUpdatedSnapshotId; // null by default
- private int dvCount;
-
- public PartitionStats(StructLike partition, int specId) {
- this.partition = partition;
- this.specId = specId;
- }
-
- public StructLike partition() {
- return partition;
- }
-
- public int specId() {
- return specId;
- }
-
- public long dataRecordCount() {
- return dataRecordCount;
- }
-
- public int dataFileCount() {
- return dataFileCount;
- }
-
- public long totalDataFileSizeInBytes() {
- return totalDataFileSizeInBytes;
- }
-
- public long positionDeleteRecordCount() {
- return positionDeleteRecordCount;
- }
-
- public int positionDeleteFileCount() {
- return positionDeleteFileCount;
- }
-
- public long equalityDeleteRecordCount() {
- return equalityDeleteRecordCount;
- }
-
- public int equalityDeleteFileCount() {
- return equalityDeleteFileCount;
- }
-
- public Long totalRecords() {
- return totalRecordCount;
- }
-
- public Long lastUpdatedAt() {
- return lastUpdatedAt;
- }
-
- public Long lastUpdatedSnapshotId() {
- return lastUpdatedSnapshotId;
- }
-
- public int dvCount() {
- return dvCount;
- }
-
- /**
- * Updates the partition stats from the data/delete file.
- *
- * @param file the {@link ContentFile} from the manifest entry.
- * @param snapshot the snapshot corresponding to the live entry.
- */
- void liveEntry(ContentFile> file, Snapshot snapshot) {
- Preconditions.checkArgument(specId == file.specId(), "Spec IDs must match");
-
- switch (file.content()) {
- case DATA:
- this.dataRecordCount += file.recordCount();
- this.dataFileCount += 1;
- this.totalDataFileSizeInBytes += file.fileSizeInBytes();
- break;
- case POSITION_DELETES:
- this.positionDeleteRecordCount += file.recordCount();
- if (file.format() == FileFormat.PUFFIN) {
- this.dvCount += 1;
- } else {
- this.positionDeleteFileCount += 1;
- }
-
- break;
- case EQUALITY_DELETES:
- this.equalityDeleteRecordCount += file.recordCount();
- this.equalityDeleteFileCount += 1;
- break;
- default:
- throw new UnsupportedOperationException("Unsupported file content type: " + file.content());
- }
-
- if (snapshot != null) {
- updateSnapshotInfo(snapshot.snapshotId(), snapshot.timestampMillis());
- }
-
- // Note: Not computing the `TOTAL_RECORD_COUNT` for now as it needs scanning the data.
- }
-
- /**
- * Updates the modified time and snapshot ID for the deleted manifest entry.
- *
- * @param snapshot the snapshot corresponding to the deleted manifest entry.
- */
- void deletedEntry(Snapshot snapshot) {
- if (snapshot != null) {
- updateSnapshotInfo(snapshot.snapshotId(), snapshot.timestampMillis());
- }
- }
-
- /**
- * Decrement the counters as it was included in the previous stats and updates the modified time
- * and snapshot ID for the deleted manifest entry.
- *
- * @param snapshot the snapshot corresponding to the deleted manifest entry.
- */
- void deletedEntryForIncrementalCompute(ContentFile> file, Snapshot snapshot) {
- Preconditions.checkArgument(specId == file.specId(), "Spec IDs must match");
-
- switch (file.content()) {
- case DATA:
- this.dataRecordCount -= file.recordCount();
- this.dataFileCount -= 1;
- this.totalDataFileSizeInBytes -= file.fileSizeInBytes();
- break;
- case POSITION_DELETES:
- this.positionDeleteRecordCount -= file.recordCount();
- if (file.format() == FileFormat.PUFFIN) {
- this.dvCount -= 1;
- } else {
- this.positionDeleteFileCount -= 1;
- }
-
- break;
- case EQUALITY_DELETES:
- this.equalityDeleteRecordCount -= file.recordCount();
- this.equalityDeleteFileCount -= 1;
- break;
- default:
- throw new UnsupportedOperationException("Unsupported file content type: " + file.content());
- }
-
- if (snapshot != null) {
- updateSnapshotInfo(snapshot.snapshotId(), snapshot.timestampMillis());
- }
- }
-
- /**
- * Appends statistics from given entry to current entry.
- *
- * @param entry the entry from which statistics will be sourced.
- */
- void appendStats(PartitionStats entry) {
- Preconditions.checkArgument(specId == entry.specId(), "Spec IDs must match");
-
- this.dataRecordCount += entry.dataRecordCount;
- this.dataFileCount += entry.dataFileCount;
- this.totalDataFileSizeInBytes += entry.totalDataFileSizeInBytes;
- this.positionDeleteRecordCount += entry.positionDeleteRecordCount;
- this.positionDeleteFileCount += entry.positionDeleteFileCount;
- this.equalityDeleteRecordCount += entry.equalityDeleteRecordCount;
- this.equalityDeleteFileCount += entry.equalityDeleteFileCount;
- this.dvCount += entry.dvCount;
-
- if (entry.totalRecordCount != null) {
- if (totalRecordCount == null) {
- this.totalRecordCount = entry.totalRecordCount;
- } else {
- this.totalRecordCount += entry.totalRecordCount;
- }
- }
-
- if (entry.lastUpdatedAt != null) {
- updateSnapshotInfo(entry.lastUpdatedSnapshotId, entry.lastUpdatedAt);
- }
- }
-
- private void updateSnapshotInfo(long snapshotId, long updatedAt) {
- if (lastUpdatedAt == null || lastUpdatedAt < updatedAt) {
- this.lastUpdatedAt = updatedAt;
- this.lastUpdatedSnapshotId = snapshotId;
- }
- }
-
- @Override
- public int size() {
- return STATS_COUNT;
- }
-
- @Override
- public T get(int pos, Class javaClass) {
- switch (pos) {
- case 0:
- return javaClass.cast(partition);
- case 1:
- return javaClass.cast(specId);
- case 2:
- return javaClass.cast(dataRecordCount);
- case 3:
- return javaClass.cast(dataFileCount);
- case 4:
- return javaClass.cast(totalDataFileSizeInBytes);
- case 5:
- return javaClass.cast(positionDeleteRecordCount);
- case 6:
- return javaClass.cast(positionDeleteFileCount);
- case 7:
- return javaClass.cast(equalityDeleteRecordCount);
- case 8:
- return javaClass.cast(equalityDeleteFileCount);
- case 9:
- return javaClass.cast(totalRecordCount);
- case 10:
- return javaClass.cast(lastUpdatedAt);
- case 11:
- return javaClass.cast(lastUpdatedSnapshotId);
- case 12:
- return javaClass.cast(dvCount);
- default:
- throw new UnsupportedOperationException("Unknown position: " + pos);
- }
- }
-
- @Override
- public void set(int pos, T value) {
- switch (pos) {
- case 0:
- this.partition = (StructLike) value;
- break;
- case 1:
- this.specId = (int) value;
- break;
- case 2:
- this.dataRecordCount = (long) value;
- break;
- case 3:
- this.dataFileCount = (int) value;
- break;
- case 4:
- this.totalDataFileSizeInBytes = (long) value;
- break;
- case 5:
- // optional field as per spec, implementation initialize to 0 for counters
- this.positionDeleteRecordCount = value == null ? 0L : (long) value;
- break;
- case 6:
- // optional field as per spec, implementation initialize to 0 for counters
- this.positionDeleteFileCount = value == null ? 0 : (int) value;
- break;
- case 7:
- // optional field as per spec, implementation initialize to 0 for counters
- this.equalityDeleteRecordCount = value == null ? 0L : (long) value;
- break;
- case 8:
- // optional field as per spec, implementation initialize to 0 for counters
- this.equalityDeleteFileCount = value == null ? 0 : (int) value;
- break;
- case 9:
- this.totalRecordCount = (Long) value;
- break;
- case 10:
- this.lastUpdatedAt = (Long) value;
- break;
- case 11:
- this.lastUpdatedSnapshotId = (Long) value;
- break;
- case 12:
- this.dvCount = value == null ? 0 : (int) value;
- break;
- default:
- throw new UnsupportedOperationException("Unknown position: " + pos);
- }
- }
-}
diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
index 9420095f94a3..29f7bcb53ce6 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
@@ -33,20 +33,14 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.types.Comparators;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.types.Types.IntegerType;
-import org.apache.iceberg.types.Types.LongType;
-import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionMap;
@@ -67,180 +61,6 @@ private PartitionStatsHandler() {}
private static final Logger LOG = LoggerFactory.getLogger(PartitionStatsHandler.class);
- // schema of the partition stats file as per spec
- /**
- * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#EMPTY_PARTITION_FIELD}
- */
- @Deprecated public static final int PARTITION_FIELD_ID = 1;
-
- /**
- * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#EMPTY_PARTITION_FIELD}
- */
- @Deprecated public static final String PARTITION_FIELD_NAME = "partition";
-
- /**
- * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#SPEC_ID}
- */
- @Deprecated
- public static final NestedField SPEC_ID = NestedField.required(2, "spec_id", IntegerType.get());
-
- /**
- * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#DATA_RECORD_COUNT}
- */
- @Deprecated
- public static final NestedField DATA_RECORD_COUNT =
- NestedField.required(3, "data_record_count", LongType.get());
-
- /**
- * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#DATA_FILE_COUNT}
- */
- @Deprecated
- public static final NestedField DATA_FILE_COUNT =
- NestedField.required(4, "data_file_count", IntegerType.get());
-
- /**
- * @deprecated will be removed in 1.12.0. Use {@link
- * PartitionStatistics#TOTAL_DATA_FILE_SIZE_IN_BYTES}
- */
- @Deprecated
- public static final NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES =
- NestedField.required(5, "total_data_file_size_in_bytes", LongType.get());
-
- /**
- * @deprecated will be removed in 1.12.0. Use {@link
- * PartitionStatistics#POSITION_DELETE_RECORD_COUNT}
- */
- @Deprecated
- public static final NestedField POSITION_DELETE_RECORD_COUNT =
- NestedField.optional(6, "position_delete_record_count", LongType.get());
-
- /**
- * @deprecated will be removed in 1.12.0. Use {@link
- * PartitionStatistics#POSITION_DELETE_FILE_COUNT}
- */
- @Deprecated
- public static final NestedField POSITION_DELETE_FILE_COUNT =
- NestedField.optional(7, "position_delete_file_count", IntegerType.get());
-
- /**
- * @deprecated will be removed in 1.12.0. Use {@link
- * PartitionStatistics#EQUALITY_DELETE_RECORD_COUNT}
- */
- @Deprecated
- public static final NestedField EQUALITY_DELETE_RECORD_COUNT =
- NestedField.optional(8, "equality_delete_record_count", LongType.get());
-
- /**
- * @deprecated will be removed in 1.12.0. Use {@link
- * PartitionStatistics#EQUALITY_DELETE_FILE_COUNT}
- */
- @Deprecated
- public static final NestedField EQUALITY_DELETE_FILE_COUNT =
- NestedField.optional(9, "equality_delete_file_count", IntegerType.get());
-
- /**
- * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#TOTAL_RECORD_COUNT}
- */
- @Deprecated
- public static final NestedField TOTAL_RECORD_COUNT =
- NestedField.optional(10, "total_record_count", LongType.get());
-
- /**
- * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#LAST_UPDATED_AT}
- */
- @Deprecated
- public static final NestedField LAST_UPDATED_AT =
- NestedField.optional(11, "last_updated_at", LongType.get());
-
- /**
- * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#LAST_UPDATED_SNAPSHOT_ID}
- */
- @Deprecated
- public static final NestedField LAST_UPDATED_SNAPSHOT_ID =
- NestedField.optional(12, "last_updated_snapshot_id", LongType.get());
-
- /**
- * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#DV_COUNT}
- */
- @Deprecated
- public static final NestedField DV_COUNT =
- NestedField.required("dv_count")
- .withId(13)
- .ofType(Types.IntegerType.get())
- .withInitialDefault(Literal.of(0))
- .withWriteDefault(Literal.of(0))
- .build();
-
- /**
- * Generates the partition stats file schema for a given format version based on a combined
- * partition type which considers all specs in a table.
- *
- * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link
- * Partitioning#partitionType(Table)}.
- * @return a schema that corresponds to the provided unified partition type.
- * @deprecated will be removed in 1.12.0. Use {@link PartitionStatistics#schema(StructType, int)}
- * instead.
- */
- @Deprecated
- public static Schema schema(StructType unifiedPartitionType, int formatVersion) {
- Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
- Preconditions.checkState(
- formatVersion > 0 && formatVersion <= TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION,
- "Invalid format version: %d",
- formatVersion);
-
- if (formatVersion <= 2) {
- return v2Schema(unifiedPartitionType);
- }
-
- return v3Schema(unifiedPartitionType);
- }
-
- private static Schema v2Schema(StructType unifiedPartitionType) {
- return new Schema(
- NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType),
- SPEC_ID,
- DATA_RECORD_COUNT,
- DATA_FILE_COUNT,
- TOTAL_DATA_FILE_SIZE_IN_BYTES,
- POSITION_DELETE_RECORD_COUNT,
- POSITION_DELETE_FILE_COUNT,
- EQUALITY_DELETE_RECORD_COUNT,
- EQUALITY_DELETE_FILE_COUNT,
- TOTAL_RECORD_COUNT,
- LAST_UPDATED_AT,
- LAST_UPDATED_SNAPSHOT_ID);
- }
-
- private static Schema v3Schema(StructType unifiedPartitionType) {
- return new Schema(
- NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType),
- SPEC_ID,
- DATA_RECORD_COUNT,
- DATA_FILE_COUNT,
- TOTAL_DATA_FILE_SIZE_IN_BYTES,
- NestedField.required(
- POSITION_DELETE_RECORD_COUNT.fieldId(),
- POSITION_DELETE_RECORD_COUNT.name(),
- LongType.get()),
- NestedField.required(
- POSITION_DELETE_FILE_COUNT.fieldId(),
- POSITION_DELETE_FILE_COUNT.name(),
- IntegerType.get()),
- NestedField.required(
- EQUALITY_DELETE_RECORD_COUNT.fieldId(),
- EQUALITY_DELETE_RECORD_COUNT.name(),
- LongType.get()),
- NestedField.required(
- EQUALITY_DELETE_FILE_COUNT.fieldId(),
- EQUALITY_DELETE_FILE_COUNT.name(),
- IntegerType.get()),
- TOTAL_RECORD_COUNT,
- LAST_UPDATED_AT,
- LAST_UPDATED_SNAPSHOT_ID,
- DV_COUNT);
- }
-
/**
* Computes the stats incrementally after the snapshot that has partition stats file till the
* current snapshot and writes the combined result into a {@link PartitionStatisticsFile} after
@@ -343,28 +163,6 @@ static PartitionStatisticsFile writePartitionStatsFile(
.build();
}
- /**
- * Reads partition statistics from the specified {@link InputFile} using given schema.
- *
- * @param schema The {@link Schema} of the partition statistics file.
- * @param inputFile An {@link InputFile} pointing to the partition stats file.
- * @deprecated will be removed in 1.12.0, use {@link PartitionStatisticsScan} instead
- */
- @Deprecated
- public static CloseableIterable readPartitionStatsFile(
- Schema schema, InputFile inputFile) {
- Preconditions.checkArgument(schema != null, "Invalid schema: null");
- Preconditions.checkArgument(inputFile != null, "Invalid input file: null");
-
- FileFormat fileFormat = FileFormat.fromFileName(inputFile.location());
- Preconditions.checkArgument(
- fileFormat != null, "Unable to determine format of file: %s", inputFile.location());
-
- CloseableIterable records =
- InternalData.read(fileFormat, inputFile).project(schema).build();
- return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats);
- }
-
private static OutputFile newPartitionStatsFile(
Table table, FileFormat fileFormat, long snapshotId) {
Preconditions.checkArgument(
@@ -382,19 +180,6 @@ private static OutputFile newPartitionStatsFile(
Locale.ROOT, "partition-stats-%d-%s", snapshotId, UUID.randomUUID()))));
}
- private static PartitionStats recordToPartitionStats(StructLike record) {
- int pos = 0;
- PartitionStats stats =
- new PartitionStats(
- record.get(pos++, StructLike.class), // partition
- record.get(pos++, Integer.class)); // spec id
- for (; pos < record.size(); pos++) {
- stats.set(pos, record.get(pos, Object.class));
- }
-
- return stats;
- }
-
private static Collection computeAndMergeStatsIncremental(
Table table, Snapshot snapshot, long lastSnapshotWithStats) {
PartitionMap statsMap = PartitionMap.create(table.specs());
@@ -680,7 +465,8 @@ private static void deletedEntryForIncrementalCompute(
* @param targetStats partition statistics to be updated.
* @param inputStats the partition statistics used as input.
*/
- private static void appendStats(PartitionStatistics targetStats, PartitionStatistics inputStats) {
+ @VisibleForTesting
+ static void appendStats(PartitionStatistics targetStats, PartitionStatistics inputStats) {
Preconditions.checkArgument(targetStats.specId() != null, "Invalid spec ID: null");
Preconditions.checkArgument(
targetStats.specId().equals(inputStats.specId()), "Spec IDs must match");
diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
index 435f79129204..69f82931833e 100644
--- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
+++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
@@ -318,7 +318,7 @@ public static RewriteResult rewriteManifestList(
private static List manifestFilesInSnapshot(FileIO io, Snapshot snapshot) {
String path = snapshot.manifestListLocation();
- List manifestFiles = Lists.newLinkedList();
+ List manifestFiles = Lists.newArrayList();
try {
manifestFiles = ManifestLists.read(io.newInputFile(path));
} catch (RuntimeIOException e) {
diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java
index a26fff1fd565..5b4cd0e55396 100644
--- a/core/src/main/java/org/apache/iceberg/SerializableTable.java
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -58,6 +58,7 @@ public class SerializableTable implements Table, HasTableOperations, Serializabl
private final int defaultSpecId;
private final Map specAsJsonMap;
private final String sortOrderAsJson;
+ private final int defaultSortOrderId;
private final Map sortOrderAsJsonMap;
private final FileIO io;
private final EncryptionManager encryption;
@@ -83,6 +84,7 @@ protected SerializableTable(Table table) {
Map specs = table.specs();
specs.forEach((specId, spec) -> specAsJsonMap.put(specId, PartitionSpecParser.toJson(spec)));
this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder());
+ this.defaultSortOrderId = table.sortOrder().orderId();
this.sortOrderAsJsonMap = Maps.newHashMap();
table
.sortOrders()
@@ -253,7 +255,8 @@ public Map sortOrders() {
ImmutableMap.Builder sortOrders =
ImmutableMap.builderWithExpectedSize(sortOrderAsJsonMap.size());
sortOrderAsJsonMap.forEach(
- (id, json) -> sortOrders.put(id, SortOrderParser.fromJson(schema(), json)));
+ (id, json) ->
+ sortOrders.put(id, SortOrderParser.fromJson(schema(), json, defaultSortOrderId)));
this.lazySortOrders = sortOrders.build();
} else if (lazySortOrders == null) {
this.lazySortOrders = lazyTable.sortOrders();
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 6ba10e8049f6..e351009a9ea6 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -113,6 +113,7 @@ public void accept(String file) {
private final AtomicInteger attempt = new AtomicInteger(0);
private final List manifestLists = Lists.newArrayList();
private final long targetManifestSizeBytes;
+ private final FileFormat manifestFormat;
private final Map manifestWriterProps;
private MetricsReporter reporter = LoggingMetricsReporter.instance();
private volatile Long snapshotId = null;
@@ -142,6 +143,10 @@ protected SnapshotProducer(TableOperations ops) {
this.targetManifestSizeBytes =
ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
+ this.manifestFormat =
+ ops.current().formatVersion() >= TableMetadata.MIN_FORMAT_VERSION_PARQUET_MANIFESTS
+ ? FileFormat.PARQUET
+ : FileFormat.AVRO;
this.manifestWriterProps = manifestWriterProperties(ops.current());
boolean snapshotIdInheritanceEnabled =
ops.current()
@@ -603,7 +608,7 @@ protected OutputFile manifestListPath() {
protected EncryptedOutputFile newManifestOutputFile() {
String manifestFileLocation =
ops.metadataFileLocation(
- FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement()));
+ manifestFormat.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement()));
return EncryptingFileIO.combine(ops.io(), ops.encryption())
.newEncryptingOutputFile(manifestFileLocation);
}
diff --git a/core/src/main/java/org/apache/iceberg/SortOrderParser.java b/core/src/main/java/org/apache/iceberg/SortOrderParser.java
index 31307cf9dc7f..53d7e5090c76 100644
--- a/core/src/main/java/org/apache/iceberg/SortOrderParser.java
+++ b/core/src/main/java/org/apache/iceberg/SortOrderParser.java
@@ -112,6 +112,10 @@ public static SortOrder fromJson(Schema schema, String json) {
return fromJson(json).bind(schema);
}
+ public static SortOrder fromJson(Schema schema, String json, int defaultSortOrderId) {
+ return JsonUtil.parse(json, node -> fromJson(schema, node, defaultSortOrderId));
+ }
+
public static SortOrder fromJson(Schema schema, JsonNode json, int defaultSortOrderId) {
UnboundSortOrder unboundSortOrder = fromJson(json);
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 43a67dd2bef2..c4a7bfc5c83c 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -57,6 +57,7 @@ public class TableMetadata implements Serializable {
static final int DEFAULT_TABLE_FORMAT_VERSION = 2;
static final int SUPPORTED_TABLE_FORMAT_VERSION = 4;
static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3;
+ static final int MIN_FORMAT_VERSION_PARQUET_MANIFESTS = 4;
static final int INITIAL_SPEC_ID = 0;
static final int INITIAL_SORT_ORDER_ID = 1;
static final int INITIAL_SCHEMA_ID = 0;
diff --git a/core/src/main/java/org/apache/iceberg/TrackedFile.java b/core/src/main/java/org/apache/iceberg/TrackedFile.java
index d9ae100ac651..8a6335972888 100644
--- a/core/src/main/java/org/apache/iceberg/TrackedFile.java
+++ b/core/src/main/java/org/apache/iceberg/TrackedFile.java
@@ -53,6 +53,10 @@ interface TrackedFile {
Types.NestedField.optional(
141, "spec_id", Types.IntegerType.get(), "Spec ID used to partition the file");
+ int PARTITION_ID = 102;
+ String PARTITION_NAME = "partition";
+ String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";
+
int CONTENT_STATS_ID = 146;
String CONTENT_STATS_NAME = "content_stats";
String CONTENT_STATS_DOC = "Content statistics for this entry";
@@ -88,7 +92,8 @@ interface TrackedFile {
Types.ListType.ofRequired(136, Types.IntegerType.get()),
"Field ids used to determine row equality in equality delete files");
- static Types.StructType schemaWithContentStats(Types.StructType contentStatsType) {
+ static Types.StructType schemaWithContentStats(
+ Types.StructType partitionType, Types.StructType contentStatsType) {
return Types.StructType.of(
TRACKING,
CONTENT_TYPE,
@@ -97,6 +102,7 @@ static Types.StructType schemaWithContentStats(Types.StructType contentStatsType
RECORD_COUNT,
FILE_SIZE_IN_BYTES,
SPEC_ID,
+ Types.NestedField.required(PARTITION_ID, PARTITION_NAME, partitionType, PARTITION_DOC),
Types.NestedField.optional(
CONTENT_STATS_ID, CONTENT_STATS_NAME, contentStatsType, CONTENT_STATS_DOC),
SORT_ORDER_ID,
@@ -128,6 +134,9 @@ static Types.StructType schemaWithContentStats(Types.StructType contentStatsType
/** Returns the ID of the partition spec used to partition this file, or null. */
Integer specId();
+ /** Returns partition for this file as a {@link StructLike}. */
+ StructLike partition();
+
/** Returns the content stats for this entry. */
ContentStats contentStats();
diff --git a/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java b/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java
index ba9fd362038a..4830f69d6bf1 100644
--- a/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java
+++ b/core/src/main/java/org/apache/iceberg/TrackedFileStruct.java
@@ -25,6 +25,7 @@
import java.util.Set;
import org.apache.iceberg.avro.SupportsIndexProjection;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.ByteBuffers;
@@ -32,6 +33,13 @@
/** Mutable {@link StructLike} implementation of {@link TrackedFile}. */
class TrackedFileStruct extends SupportsIndexProjection implements TrackedFile, Serializable {
private static final Types.StructType EMPTY_STRUCT_TYPE = Types.StructType.of();
+ private static final PartitionData EMPTY_PARTITION_DATA =
+ new PartitionData(EMPTY_STRUCT_TYPE) {
+ @Override
+ public PartitionData copy() {
+ return this; // this does not change
+ }
+ };
private static final Types.StructType BASE_TYPE =
Types.StructType.of(
@@ -42,6 +50,11 @@ class TrackedFileStruct extends SupportsIndexProjection implements TrackedFile,
TrackedFile.RECORD_COUNT,
TrackedFile.FILE_SIZE_IN_BYTES,
TrackedFile.SPEC_ID,
+ Types.NestedField.required(
+ TrackedFile.PARTITION_ID,
+ TrackedFile.PARTITION_NAME,
+ EMPTY_STRUCT_TYPE,
+ TrackedFile.PARTITION_DOC),
Types.NestedField.optional(
TrackedFile.CONTENT_STATS_ID,
TrackedFile.CONTENT_STATS_NAME,
@@ -60,6 +73,7 @@ class TrackedFileStruct extends SupportsIndexProjection implements TrackedFile,
private long recordCount = -1L;
private long fileSizeInBytes = -1L;
private Integer specId = null;
+ private PartitionData partitionData = EMPTY_PARTITION_DATA;
// optional fields
private Tracking tracking = null;
@@ -74,6 +88,11 @@ class TrackedFileStruct extends SupportsIndexProjection implements TrackedFile,
/** Used by internal readers to instantiate this class with a projection schema. */
TrackedFileStruct(Types.StructType projection) {
super(BASE_TYPE, projection);
+ // partition type may be null if the field was not projected
+ Type partType = projection.fieldType("partition");
+ if (partType != null) {
+ this.partitionData = new PartitionData(partType.asNestedType().asStructType());
+ }
}
/** No-projection constructor for direct construction. */
@@ -87,6 +106,7 @@ class TrackedFileStruct extends SupportsIndexProjection implements TrackedFile,
FileContent contentType,
String location,
FileFormat fileFormat,
+ PartitionData partition,
long recordCount,
long fileSizeInBytes) {
super(BASE_TYPE.fields().size());
@@ -96,6 +116,9 @@ class TrackedFileStruct extends SupportsIndexProjection implements TrackedFile,
this.fileFormat = fileFormat;
this.recordCount = recordCount;
this.fileSizeInBytes = fileSizeInBytes;
+ if (partition != null) {
+ this.partitionData = partition;
+ }
}
/** Copy constructor. */
@@ -107,9 +130,8 @@ private TrackedFileStruct(TrackedFileStruct toCopy, boolean withStats, Set void internalSet(int pos, T value) {
this.specId = (Integer) value;
break;
case 7:
- this.contentStats = (ContentStats) value;
+ this.partitionData = (PartitionData) value;
break;
case 8:
- this.sortOrderId = (Integer) value;
+ this.contentStats = (ContentStats) value;
break;
case 9:
- this.deletionVector = (DeletionVector) value;
+ this.sortOrderId = (Integer) value;
break;
case 10:
- this.manifestInfo = (ManifestInfo) value;
+ this.deletionVector = (DeletionVector) value;
break;
case 11:
- this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value);
+ this.manifestInfo = (ManifestInfo) value;
break;
case 12:
- this.splitOffsets = ArrayUtil.toLongArray((List) value);
+ this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value);
break;
case 13:
+ this.splitOffsets = ArrayUtil.toLongArray((List) value);
+ break;
+ case 14:
this.equalityIds = ArrayUtil.toIntArray((List) value);
break;
default:
@@ -315,6 +347,7 @@ public String toString() {
.add("record_count", recordCount)
.add("file_size_in_bytes", fileSizeInBytes)
.add("spec_id", specId())
+ .add("partition", partitionData)
.add("tracking", tracking)
.add("content_stats", contentStats)
.add("sort_order_id", sortOrderId)
diff --git a/core/src/main/java/org/apache/iceberg/V4Metadata.java b/core/src/main/java/org/apache/iceberg/V4Metadata.java
index 67478290aa10..06fc75213df0 100644
--- a/core/src/main/java/org/apache/iceberg/V4Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V4Metadata.java
@@ -23,6 +23,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Types;
class V4Metadata {
@@ -278,28 +279,38 @@ static Schema wrapFileSchema(Types.StructType fileSchema) {
}
static Types.StructType fileType(Types.StructType partitionType) {
- return Types.StructType.of(
- DataFile.CONTENT.asRequired(),
- DataFile.FILE_PATH,
- DataFile.FILE_FORMAT,
- required(
- DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType, DataFile.PARTITION_DOC),
- DataFile.RECORD_COUNT,
- DataFile.FILE_SIZE,
- DataFile.COLUMN_SIZES,
- DataFile.VALUE_COUNTS,
- DataFile.NULL_VALUE_COUNTS,
- DataFile.NAN_VALUE_COUNTS,
- DataFile.LOWER_BOUNDS,
- DataFile.UPPER_BOUNDS,
- DataFile.KEY_METADATA,
- DataFile.SPLIT_OFFSETS,
- DataFile.EQUALITY_IDS,
- DataFile.SORT_ORDER_ID,
- DataFile.FIRST_ROW_ID,
- DataFile.REFERENCED_DATA_FILE,
- DataFile.CONTENT_OFFSET,
- DataFile.CONTENT_SIZE);
+ // Parquet cannot represent empty groups, so the partition field is omitted entirely from
+ // the file schema for unpartitioned tables. DataFileWrapper adjusts positions to match.
+ ImmutableList.Builder fields =
+ ImmutableList.builderWithExpectedSize(partitionType.fields().isEmpty() ? 18 : 19);
+ fields.add(DataFile.CONTENT.asRequired());
+ fields.add(DataFile.FILE_PATH);
+ fields.add(DataFile.FILE_FORMAT);
+ if (!partitionType.fields().isEmpty()) {
+ fields.add(
+ required(
+ DataFile.PARTITION_ID,
+ DataFile.PARTITION_NAME,
+ partitionType,
+ DataFile.PARTITION_DOC));
+ }
+ fields.add(DataFile.RECORD_COUNT);
+ fields.add(DataFile.FILE_SIZE);
+ fields.add(DataFile.COLUMN_SIZES);
+ fields.add(DataFile.VALUE_COUNTS);
+ fields.add(DataFile.NULL_VALUE_COUNTS);
+ fields.add(DataFile.NAN_VALUE_COUNTS);
+ fields.add(DataFile.LOWER_BOUNDS);
+ fields.add(DataFile.UPPER_BOUNDS);
+ fields.add(DataFile.KEY_METADATA);
+ fields.add(DataFile.SPLIT_OFFSETS);
+ fields.add(DataFile.EQUALITY_IDS);
+ fields.add(DataFile.SORT_ORDER_ID);
+ fields.add(DataFile.FIRST_ROW_ID);
+ fields.add(DataFile.REFERENCED_DATA_FILE);
+ fields.add(DataFile.CONTENT_OFFSET);
+ fields.add(DataFile.CONTENT_SIZE);
+ return Types.StructType.of(fields.build());
}
static class ManifestEntryWrapper>
@@ -309,10 +320,10 @@ static class ManifestEntryWrapper>
private final DataFileWrapper> fileWrapper;
private ManifestEntry wrapped = null;
- ManifestEntryWrapper(Long commitSnapshotId) {
- this.size = entrySchema(Types.StructType.of()).columns().size();
+ ManifestEntryWrapper(Long commitSnapshotId, Types.StructType partitionType) {
+ this.size = entrySchema(partitionType).columns().size();
this.commitSnapshotId = commitSnapshotId;
- this.fileWrapper = new DataFileWrapper<>();
+ this.fileWrapper = new DataFileWrapper<>(partitionType);
}
public ManifestEntryWrapper wrap(ManifestEntry entry) {
@@ -423,11 +434,15 @@ public ManifestEntry copyWithoutStats() {
/** Wrapper used to write DataFile or DeleteFile to v4 metadata. */
static class DataFileWrapper> extends Delegates.DelegatingContentFile
implements ContentFile, StructLike {
+ private static final int PARTITION_POSITION = 3;
+
private final int size;
+ private final boolean hasPartition;
- DataFileWrapper() {
+ DataFileWrapper(Types.StructType partitionType) {
super(null);
- this.size = fileType(Types.StructType.of()).fields().size();
+ this.hasPartition = !partitionType.fields().isEmpty();
+ this.size = fileType(partitionType).fields().size();
}
@SuppressWarnings("unchecked")
@@ -452,7 +467,10 @@ public T get(int pos, Class javaClass) {
}
private Object get(int pos) {
- switch (pos) {
+ // when the partition field is omitted, positions at or after where it would appear
+ // shift down by 1, so adjust back to the canonical field ordering
+ int adjusted = hasPartition ? pos : (pos >= PARTITION_POSITION ? pos + 1 : pos);
+ switch (adjusted) {
case 0:
return wrapped.content().id();
case 1:
diff --git a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java
index 8678a6e33681..f2d8a02a3051 100644
--- a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java
+++ b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java
@@ -69,7 +69,7 @@ public SeekableInputStream newStream() {
long ciphertextLength = encryptedLength();
Preconditions.checkState(
ciphertextLength >= Ciphers.MIN_STREAM_LENGTH,
- "Invalid encrypted stream: %d is shorter than the minimum possible stream length",
+ "Invalid encrypted stream: %s is shorter than the minimum possible stream length",
ciphertextLength);
return new AesGcmInputStream(sourceFile.newStream(), ciphertextLength, dataKey, fileAADPrefix);
}
diff --git a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java
index 1f52ab3682f8..e5917a2fc503 100644
--- a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java
+++ b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java
@@ -79,7 +79,7 @@ private void validateHeader() throws IOException {
int plainBlockSize = ByteBuffer.wrap(headerBytes, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt();
Preconditions.checkState(
plainBlockSize == Ciphers.PLAIN_BLOCK_SIZE,
- "Invalid GCM stream: block size %d != %d",
+ "Invalid GCM stream: block size %s != %s",
plainBlockSize,
Ciphers.PLAIN_BLOCK_SIZE);
}
diff --git a/core/src/main/java/org/apache/iceberg/io/MultiBufferInputStream.java b/core/src/main/java/org/apache/iceberg/io/MultiBufferInputStream.java
index 6d38497d224f..fae2e615ef8e 100644
--- a/core/src/main/java/org/apache/iceberg/io/MultiBufferInputStream.java
+++ b/core/src/main/java/org/apache/iceberg/io/MultiBufferInputStream.java
@@ -263,7 +263,7 @@ public int read(byte[] bytes) {
@Override
public int read() throws IOException {
if (current == null) {
- throw new EOFException();
+ return -1;
}
while (true) {
@@ -272,7 +272,7 @@ public int read() throws IOException {
return current.get() & 0xFF; // as unsigned
} else if (!nextBuffer()) {
// there are no more buffers
- throw new EOFException();
+ return -1;
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/io/SingleBufferInputStream.java b/core/src/main/java/org/apache/iceberg/io/SingleBufferInputStream.java
index fef2f9164f5e..50431faf7e95 100644
--- a/core/src/main/java/org/apache/iceberg/io/SingleBufferInputStream.java
+++ b/core/src/main/java/org/apache/iceberg/io/SingleBufferInputStream.java
@@ -58,7 +58,7 @@ public long getPos() {
@Override
public int read() throws IOException {
if (!buffer.hasRemaining()) {
- throw new EOFException();
+ return -1;
}
return buffer.get() & 0xFF; // as unsigned
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
index 46d9177b9571..d2ddc22aee97 100644
--- a/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
+++ b/core/src/main/java/org/apache/iceberg/rest/HTTPClient.java
@@ -36,7 +36,6 @@
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
-import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
@@ -44,6 +43,7 @@
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
import org.apache.hc.client5.http.ssl.HostnameVerificationPolicy;
+import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHeaders;
@@ -164,7 +164,7 @@ public HTTPClient withAuthSession(AuthSession session) {
return new HTTPClient(this, session);
}
- private static String extractResponseBodyAsString(CloseableHttpResponse response) {
+ private static String extractResponseBodyAsString(ClassicHttpResponse response) {
try {
if (response.getEntity() == null) {
return null;
@@ -177,7 +177,7 @@ private static String extractResponseBodyAsString(CloseableHttpResponse response
}
}
- private static boolean isSuccessful(CloseableHttpResponse response) {
+ private static boolean isSuccessful(ClassicHttpResponse response) {
int code = response.getCode();
return code == HttpStatus.SC_OK
|| code == HttpStatus.SC_ACCEPTED
@@ -185,7 +185,7 @@ private static boolean isSuccessful(CloseableHttpResponse response) {
|| code == HttpStatus.SC_NOT_MODIFIED;
}
- private static ErrorResponse buildDefaultErrorResponse(CloseableHttpResponse response) {
+ private static ErrorResponse buildDefaultErrorResponse(ClassicHttpResponse response) {
String responseReason = response.getReasonPhrase();
String message =
responseReason != null && !responseReason.isEmpty()
@@ -202,7 +202,7 @@ private static ErrorResponse buildDefaultErrorResponse(CloseableHttpResponse res
// Process a failed response through the provided errorHandler, and throw a RESTException if the
// provided error handler doesn't already throw.
private static void throwFailure(
- CloseableHttpResponse response, String responseBody, Consumer errorHandler) {
+ ClassicHttpResponse response, String responseBody, Consumer errorHandler) {
ErrorResponse errorResponse = null;
if (responseBody != null) {
@@ -300,7 +300,6 @@ protected T execute(
req, responseType, errorHandler, responseHeaders, ParserContext.builder().build());
}
- @SuppressWarnings("deprecation")
@Override
protected T execute(
HTTPRequest req,
@@ -318,53 +317,69 @@ protected T execute(
}
HttpContext context = HttpClientContext.create();
- try (CloseableHttpResponse response = httpClient.execute(request, context)) {
- Map respHeaders = Maps.newHashMap();
- for (Header header : response.getHeaders()) {
- respHeaders.put(header.getName(), header.getValue());
- }
-
- responseHeaders.accept(respHeaders);
-
- // Skip parsing the response stream for any successful request not expecting a response body
- if (emptyBody(response, responseType)) {
- if (response.getCode() == HttpStatus.SC_NOT_MODIFIED
- && !req.headers().contains(HttpHeaders.IF_NONE_MATCH)) {
- // 304-NOT_MODIFIED is used for freshness-aware loading and requires an ETag sent to the
- // server via IF_NONE_MATCH header in the request. If no ETag was sent, we shouldn't
- // receive a 304.
- throw new RESTException(
- "Invalid (NOT_MODIFIED) response for request: method=%s, path=%s",
- req.method(), req.path());
- }
+ try {
+ return httpClient.execute(
+ request,
+ context,
+ response ->
+ handleResponse(
+ req, response, responseType, errorHandler, responseHeaders, parserContext));
+ } catch (IOException e) {
+ throw new RESTException(e, "Error occurred while processing %s request", req.method());
+ }
+ }
- return null;
- }
+ private T handleResponse(
+ HTTPRequest request,
+ ClassicHttpResponse response,
+ Class responseType,
+ Consumer errorHandler,
+ Consumer