Skip to content

Commit 6428cb2

Browse files
authored
perf(spark): Resolve drop-partition-columns projection once per writer instead of per row (#18972)
BulkInsertDataInternalWriterHelper#write redid constant work for every row when hoodie.datasource.write.drop.partition.columns is enabled: resolving the config flag, instantiating a key generator via constructor reflection through getPartitionPathCols, recomputing the partition-column ordinals into a fresh HashSet, and round-tripping the whole row through toSeq/fromSeq (boxing every column). The flag is now resolved once in the constructor, and the retained (non-partition) field ordinals and types are computed once on the first write(). The lazy initialization keeps the partition-column resolution unreachable for the bucket-index subclasses, which override write() and never drop columns, and for tasks that write no rows, matching the previous reachability exactly. write() copies the retained fields into a fresh GenericInternalRow, which is value-identical to the previous toSeq/filter/fromSeq output.
1 parent 0915f77 commit 6428cb2

1 file changed

Lines changed: 46 additions & 18 deletions

File tree

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import lombok.extern.slf4j.Slf4j;
3434
import org.apache.spark.sql.catalyst.InternalRow;
35+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
3536
import org.apache.spark.sql.types.DataType;
3637
import org.apache.spark.sql.types.StructType;
3738
import org.apache.spark.unsafe.types.UTF8String;
@@ -69,6 +70,13 @@ public class BulkInsertDataInternalWriterHelper {
6970
protected final boolean simpleKeyGen;
7071
protected final int simplePartitionFieldIndex;
7172
protected final DataType simplePartitionFieldDataType;
73+
protected final boolean shouldDropPartitionColumns;
74+
// Ordinals and types of the non-partition fields, computed once on the first write() instead of
75+
// in the constructor: bucket-index subclasses override write() and never drop columns, and the
76+
// partition-column resolution must stay unreachable for them (and for tasks that write no rows)
77+
// exactly as before. The helper is confined to a single task thread, so plain lazy init is safe.
78+
private int[] retainedOrdinals;
79+
private DataType[] retainedTypes;
7280
/**
7381
* NOTE: This is stored as Catalyst's internal {@link UTF8String} to avoid
7482
* conversion (deserialization) b/w {@link UTF8String} and {@link String}
@@ -114,6 +122,36 @@ public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteCo
114122
this.simplePartitionFieldIndex = -1;
115123
this.simplePartitionFieldDataType = null;
116124
}
125+
126+
this.shouldDropPartitionColumns = writeConfig.shouldDropPartitionColumns();
127+
}
128+
129+
/**
130+
* Resolves the ordinals and types of the non-partition fields. The partition columns are a pure
131+
* function of the write config and schema, both immutable for the helper's lifetime, so this
132+
* runs once per helper instead of once per row (getPartitionPathCols instantiates a key
133+
* generator reflectively).
134+
*/
135+
private void initRetainedFields() {
136+
List<String> partitionCols = JavaScalaConverters.convertScalaListToJavaList(
137+
HoodieDatasetBulkInsertHelper.getPartitionPathCols(this.writeConfig));
138+
Set<Integer> partitionIdx = new HashSet<>();
139+
for (String col : partitionCols) {
140+
partitionIdx.add(this.structType.fieldIndex(col));
141+
}
142+
int numRetained = structType.fields().length - partitionIdx.size();
143+
int[] ordinals = new int[numRetained];
144+
DataType[] types = new DataType[numRetained];
145+
int retained = 0;
146+
for (int i = 0; i < structType.fields().length; i++) {
147+
if (!partitionIdx.contains(i)) {
148+
ordinals[retained] = i;
149+
types[retained] = structType.fields()[i].dataType();
150+
retained++;
151+
}
152+
}
153+
this.retainedOrdinals = ordinals;
154+
this.retainedTypes = types;
117155
}
118156

119157
public void write(InternalRow row) throws IOException {
@@ -126,27 +164,17 @@ public void write(InternalRow row) throws IOException {
126164
lastKnownPartitionPath = partitionPath.clone();
127165
}
128166

129-
boolean shouldDropPartitionColumns = writeConfig.shouldDropPartitionColumns();
130167
if (shouldDropPartitionColumns) {
131-
// Drop the partition columns from the row
132-
List<String> partitionCols = JavaScalaConverters.convertScalaListToJavaList(HoodieDatasetBulkInsertHelper.getPartitionPathCols(this.writeConfig));
133-
Set<Integer> partitionIdx = new HashSet<>();
134-
for (String col : partitionCols) {
135-
partitionIdx.add(this.structType.fieldIndex(col));
168+
if (retainedOrdinals == null) {
169+
initRetainedFields();
136170
}
137-
138-
// Relies on InternalRow::toSeq(...) preserving the column ordering based on the supplied schema
139-
List<Object> cols = JavaScalaConverters.convertScalaListToJavaList(row.toSeq(structType));
140-
int idx = 0;
141-
List<Object> newCols = new ArrayList<>();
142-
for (Object o : cols) {
143-
if (!partitionIdx.contains(idx)) {
144-
newCols.add(o);
145-
}
146-
idx += 1;
171+
// Drop the partition columns from the row by copying the retained fields; a fresh row is
172+
// allocated per record so values keep the same aliasing behavior as InternalRow.fromSeq
173+
Object[] values = new Object[retainedOrdinals.length];
174+
for (int i = 0; i < retainedOrdinals.length; i++) {
175+
values[i] = row.get(retainedOrdinals[i], retainedTypes[i]);
147176
}
148-
InternalRow newRow = InternalRow.fromSeq(JavaScalaConverters.convertJavaListToScalaSeq(newCols));
149-
handle.write(newRow);
177+
handle.write(new GenericInternalRow(values));
150178
} else {
151179
handle.write(row);
152180
}

0 commit comments

Comments
 (0)