refactor(variant): self-align log-block variant rows, drop buffer-level projection hook#18923
refactor(variant): self-align log-block variant rows, drop buffer-level projection hook#18923voonhous wants to merge 3 commits into
Conversation
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR pushes variant projection down into the log readers (parquet native projection and avro deserialization hook) and removes the buffer-level projection step, which cleanly separates Spark-4.1 variant concerns from format-agnostic merge code. One worth-checking edge case in the inline comment around how the new ordering interacts with the schema-evolution transformer when both are active. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small readability nits below, otherwise the refactor is clean.
…er hook Remove the engine-neutral FileGroupRecordBuffer variant-projection composition (apache#18674's getLogBlockRecordProjection hook) so the merge buffer stays format-agnostic; each log reader now emits rows already aligned to the projected read schema (apache#18739). - Parquet log blocks: thread the variant-overlaid StructType into a new HoodieSparkParquetReader.getUnsafeRowIterator(HoodieSchema, StructType, filters) overload so SPARK_ROW_REQUESTED_SCHEMA carries VariantMetadata and parquet-mr decodes variants into the projected struct shape natively (mirrors the base-file path). Wired in SparkFileFormatInternalRowReaderContext.getFileRecordIterator. - Avro log blocks: new no-op HoodieReaderContext.projectLogBlockRecords hook, invoked from HoodieAvroDataBlock.deserializeRecords; Spark overrides it to apply the VariantGet rewrite (relocated from the deleted buffer hook). - Both paths gated by a single shouldProjectVariants predicate (variant projection present AND merger not PAYLOAD_BASED), preserving the buffer's custom-payload skip. - FileGroupRecordBuffer/PositionBased now call getSchemaTransformerWithEvolvedSchema directly; getProjectedTransformer and getLogBlockRecordProjection deleted. - Sub-task 4: documented why the sparkRequiredSchema overlay must stay (HoodieSchema can't carry VariantMetadata); kept Spark-side, no schema-model change. buildVariantProjector / isVariantProjectionStruct unchanged (caller moved). Addresses apache#18739.
- HoodieSparkParquetReader: rename variant-overload parameter to structSchema and drop the no-op alias. - SparkFileFormatInternalRowReaderContext: extract isPayloadBased and drop the double negation in shouldProjectVariants.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! This PR cleanly pushes the Spark 4.1 variant projection out of the engine-neutral merge buffer and into the engine-specific reader/log-block paths, which is a nice architectural improvement. No new issues flagged from this automated pass — the ordering-flip concern (schema-evolve then project → project then schema-evolve, which only matters when schema-on-read is active) was already raised and acknowledged in round 1. A Hudi committer or PMC member can take it from here for a final review. A couple of minor naming and readability suggestions below.
cc @yihua
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for addressing the feedback! These follow-ups rename the parquet reader's variant overload parameter to projectedStructSchema (matching Javadoc and call sites) and document why the merger != null guard in the Spark reader context is real rather than defensive. Both of my prior nits and the corresponding hudi-agent suggestions are resolved, and no new issues surfaced from this incremental pass — a Hudi committer or PMC member can take it from here for a final review.
cc @yihua
…rger guard - Rename getUnsafeRowIterator param structSchema to projectedStructSchema - Add inline comment explaining the merger != null guard
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the iteration! This round's changes (renaming structSchema → projectedStructSchema, extracting isPayloadBased, and the null-guard rationale) all look good. The variant-projection refactor cleanly separates engine-specific concerns from the buffer code — Avro log blocks project in deserializeRecords via projectLogBlockRecords, parquet log blocks project natively in the reader, and the buffers go back to the plain evolved-schema transformer. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. One minor naming suggestion; the rest of the changes are clean and well-commented.
cc @yihua
| if (!shouldProjectVariants) { | ||
| return recordIterator | ||
| } | ||
| val req = sparkRequiredSchema.get |
There was a problem hiding this comment.
🤖 nit: req is a bit terse here — the adjacent variables (dataStruct, targetStruct) are more descriptive. Could you rename it to requiredStruct to stay consistent?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18923 +/- ##
============================================
+ Coverage 68.20% 68.24% +0.04%
- Complexity 29458 29476 +18
============================================
Files 2542 2542
Lines 142545 142553 +8
Branches 17778 17778
============================================
+ Hits 97218 97289 +71
+ Misses 37316 37247 -69
- Partials 8011 8017 +6
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
Addresses #18739.
PR #18674 aligned Spark 4.1 MOR variant reads via
HoodieReaderContext.getLogBlockRecordProjection, a per-row projector run inside the engine-neutralFileGroupRecordBuffer. That leaked Spark-4.1 /PushVariantIntoScanconcerns into format-agnostic merge code. This PR pushes variant projection down into the log readers so the buffer no longer knows about variants.Summary and Changelog
Each log reader now emits rows already aligned to the projected read schema; the buffer-level projection hook is removed.
FileGroupRecordBuffer.getProjectedTransformerandHoodieReaderContext.getLogBlockRecordProjection. The buffers (FileGroupRecordBuffer,PositionBasedFileGroupRecordBuffer) now callgetSchemaTransformerWithEvolvedSchemadirectly and keep returning the unprojected evolved schema (merger reads metadata cols by ordinal).HoodieReaderContext.projectLogBlockRecords(iter, dataBlockSchema).HoodieAvroDataBlock.deserializeRecordsinvokes it; the Spark reader context overrides it to apply theVariantGetrewrite (relocated from the old hook).HoodieSparkParquetReader.getUnsafeRowIterator(HoodieSchema, StructType, filters)overload threads the projected struct soSPARK_ROW_REQUESTED_SCHEMAcarriesVariantMetadataand parquet-mr projects natively (mirrors the base-file path).shouldProjectVariantsgate (variant projection present AND merger notPAYLOAD_BASED) drives both paths, preserving the previous custom-payload skip.No code copied.
buildVariantProjector/isVariantProjectionStructare unchanged (caller moved).Impact
Internal change to the
HoodieReaderContextextension point (getLogBlockRecordProjection->projectLogBlockRecords); the new hook is a no-op by default. No new configs. No user-facing behavior change for non-Spark engines or Spark < 4.1; Spark 4.1 variant MOR reads are unchanged in result, only the alignment moves out of the buffer.Risk Level
Medium. Touches the engine-neutral merge buffer and the reader-context API. Mitigations: the new hook is a no-op default (non-Spark engines and Spark < 4.1 unaffected); projection is gated by
shouldProjectVariants(custom-payload tables skip it); the buffer still returns the unprojected evolved schema, so the merger's ordinal-based metadata access is unchanged. Verified withTestVariantDataTypeon Spark 4.0 and 4.1 (cow + mor: insert / update / delete / merge-into,cast(v as string)selects) across key-based, position-based, and unmerged paths, plus a custom-payload variant MOR case.Documentation Update
none
Contributor's checklist