Skip to content

Commit 3fdd44c

Browse files
committed
[DRAFT] HIVE-29287: Variant Shredding
1 parent 2055879 commit 3fdd44c

File tree

6 files changed

+1591
-2
lines changed

6 files changed

+1591
-2
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iceberg.mr.hive.writer;
2121

22+
import java.util.Map;
2223
import org.apache.iceberg.FileFormat;
2324
import org.apache.iceberg.Schema;
2425
import org.apache.iceberg.SortOrder;
@@ -31,9 +32,13 @@
3132
import org.apache.iceberg.data.parquet.GenericParquetWriter;
3233
import org.apache.iceberg.orc.ORC;
3334
import org.apache.iceberg.parquet.Parquet;
35+
import org.apache.iceberg.types.Types;
3436

3537
class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
3638

39+
private final Map<String, String> properties;
40+
private Record sampleRecord = null;
41+
3742
HiveFileWriterFactory(
3843
Table table,
3944
FileFormat dataFileFormat,
@@ -54,6 +59,7 @@ class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
5459
equalityDeleteRowSchema,
5560
equalityDeleteSortOrder,
5661
positionDeleteRowSchema);
62+
properties = table.properties();
5763
}
5864

5965
static Builder builderFor(Table table) {
@@ -78,6 +84,11 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
7884
@Override
7985
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
8086
builder.createWriterFunc(GenericParquetWriter::create);
87+
// Configure variant shredding function if conditions are met:
88+
if (hasVariantColumns(dataSchema()) && isVariantShreddingEnabled(properties)) {
89+
builder.variantShreddingFunc(
90+
Parquet.constructVariantShreddingFunction(sampleRecord, dataSchema()));
91+
}
8192
}
8293

8394
@Override
@@ -149,4 +160,32 @@ HiveFileWriterFactory build() {
149160
positionDeleteRowSchema);
150161
}
151162
}
163+
164+
/**
165+
* Check if the schema contains any variant columns.
166+
*/
167+
private static boolean hasVariantColumns(Schema schema) {
168+
return schema.columns().stream()
169+
.anyMatch(field -> field.type() instanceof Types.VariantType);
170+
}
171+
172+
/**
173+
* Check if variant shredding is enabled via table properties.
174+
*/
175+
private static boolean isVariantShreddingEnabled(Map<String, String> properties) {
176+
String shreddingEnabled = properties.get("variant.shredding.enabled");
177+
return "true".equalsIgnoreCase(shreddingEnabled);
178+
}
179+
180+
/**
181+
* Set a sample record to use for data-driven variant shredding schema generation.
182+
* Should be called before the Parquet writer is created.
183+
*/
184+
public void setSampleRecord(Record record) {
185+
this.sampleRecord = record;
186+
}
187+
188+
public Record getSampleRecord() {
189+
return sampleRecord;
190+
}
152191
}

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
4040
private final Set<String> missingColumns;
4141
private final List<Types.NestedField> missingOrStructFields;
4242

43+
private final HiveFileWriterFactory fileWriterFactory;
44+
4345
HiveIcebergRecordWriter(Table table, HiveFileWriterFactory fileWriterFactory,
4446
OutputFileFactory dataFileFactory, Context context) {
4547
super(table, newDataWriter(table, fileWriterFactory, dataFileFactory, context));
@@ -48,17 +50,20 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
4850
this.missingColumns = context.missingColumns();
4951
this.missingOrStructFields = specs.get(currentSpecId).schema().asStruct().fields().stream()
5052
.filter(field -> missingColumns.contains(field.name()) || field.type().isStructType()).toList();
53+
this.fileWriterFactory = fileWriterFactory;
5154
}
5255

5356
@Override
5457
public void write(Writable row) throws IOException {
5558
Record record = ((Container<Record>) row).get();
5659
HiveSchemaUtil.setDefaultValues(record, missingOrStructFields, missingColumns);
5760

61+
if (fileWriterFactory.getSampleRecord() == null) {
62+
fileWriterFactory.setSampleRecord(record);
63+
}
5864
writer.write(record, specs.get(currentSpecId), partition(record, currentSpecId));
5965
}
6066

61-
6267
@Override
6368
public FilesForCommit files() {
6469
List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();

0 commit comments

Comments
 (0)