feat(client): enrich write commit callback message and fire it for table-service commits#18988
feat(client): enrich write commit callback message and fire it for table-service commits#18988codope wants to merge 2 commits into
Conversation
…ble-service commits
Two backward-compatible improvements to the post-commit write callback mechanism:
1. Enrich HoodieWriteCommitCallbackMessage with two optional fields so callback
implementations no longer have to rebuild a FileSystemView or reach into engine
config:
- prevFilePaths: Map<fileId, PrevFilePaths> -- the previous base file (and
bootstrap source, if any) each updated file group replaces, pre-resolved by the
write client from its cached file-system view.
- extraContext: Map<String,String> -- free-form context producers can attach.
Both default to empty maps; the existing 4-arg and 6-arg constructors are preserved.
2. Fire the callback for table-service commits too (compaction and clustering
completion), not just data commits. The shared firing logic (fireCommitCallback)
and prev-file resolution (resolvePrevFilePaths) are lifted into BaseHoodieClient so
both BaseHoodieWriteClient (data commits, via postCommit) and
BaseHoodieTableServiceClient (compaction/clustering completion) reuse them. The
commitCallback field is lifted up from BaseHoodieWriteClient.
postCommit now receives the resolved commit action type so the callback reports the
actual action (e.g. replacecommit for insert_overwrite) rather than the table's base
action type.
Best-effort by design: callback and prev-file resolution failures are logged and never
fail the write.
Adds TestBaseHoodieClient covering resolvePrevFilePaths (inserts, updates, bootstrap
capture, missing-file skip, best-effort on view failure, null inputs) and the message
default/retention contract.
f2f674d to
127d370
Compare
| * {@code FileSystemView}. | ||
| */ | ||
| protected static Map<String, PrevFilePaths> resolvePrevFilePaths(List<HoodieWriteStat> stats, | ||
| BaseFileOnlyView fsView) { |
There was a problem hiding this comment.
query the fsview is costly, can we make it a supplier so that it is only called when necessary.
There was a problem hiding this comment.
switched to a Supplier so the view is only resolved after the callback-enabled check, keeping the default (callback-off) path free. Even before, on the enabled path, it reuses the same cached view the writer already populated, so no extra I/O.
| } | ||
| commitCallback.call(new HoodieWriteCommitCallbackMessage( | ||
| commitTime, config.getTableName(), config.getBasePath(), | ||
| stats, Option.of(commitActionType), extraMetadata, resolvePrevFilePaths(stats, fsView), |
There was a problem hiding this comment.
not sure the usage of the prev file paths, is it for debugging?
There was a problem hiding this comment.
this was needed for an internal feature.. it lets callback consumers (e.g. testing frameworks, lineage/audit sinks) see which base file each update replaced without rebuilding a FileSystemView themselves.
cc @prashantwason
| * <p>Best-effort: catches and logs any exception from the user-supplied callback so a | ||
| * misbehaving observer cannot fail the commit. | ||
| */ | ||
| protected void fireCommitCallback(String commitTime, |
There was a problem hiding this comment.
mayFireCommitCallback or fireCommitCallbackIfNecessary
Signed-off-by: codope <sagarsumit09@gmail.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18988 +/- ##
=========================================
Coverage 68.25% 68.25%
- Complexity 29509 29520 +11
=========================================
Files 2544 2544
Lines 142744 142800 +56
Branches 17816 17820 +4
=========================================
+ Hits 97433 97473 +40
- Misses 37304 37320 +16
Partials 8007 8007
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
| ); | ||
| } | ||
| log.info("Compacted successfully on commit {}", compactionCommitTime); | ||
| fireCommitCallbackIfNecessary(compactionCommitTime, HoodieTimeline.COMMIT_ACTION, |
There was a problem hiding this comment.
completeLogCompaction (LOG_COMPACTION_ACTION) is also a table-service completion with write stats and a timeline commit, but it does not call fireCommitCallbackIfNecessary while compaction and clustering do. Intentional scope, or should log compaction fire the callback too? If intentional, a short note here on why it is excluded would keep the set explicit.
| * group from a cached {@link BaseFileOnlyView}, so callback implementations receive the | ||
| * read/write file pairing without rebuilding a file-system view. | ||
| */ | ||
| public class TestBaseHoodieClient { |
There was a problem hiding this comment.
No test asserts the PR's headline behavior - that the callback fires on compaction and clustering completion. resolvePrevFilePaths and the message contract are covered, but completeCompaction/completeClustering firing is not (codecov flags fireCommitCallbackIfNecessary as uncovered). Consider a test that registers a recording callback and asserts it fires once per table-service commit with the expected action type (commit / replacecommit).
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR enriches the write commit callback message with previous-file paths and extends callback firing to compaction and clustering completions. A few correctness concerns worth checking in the inline comments — particularly around the commitActionType passed from postWrite (which uses the table's base action type rather than the operation's actual action), the action type for clustering on newer table versions, and a subtle timing change where the callback now fires before mayBeCleanAndArchive/runTableServicesInline instead of after. Please take a look at the inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A few naming/comment and minor consistency nits below; the main one is a misleading Javadoc phrase on extraContext that implies it's optional when it isn't.
| try { | ||
| postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty()); | ||
| postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, | ||
| hoodieTable.getMetaClient().getCommitActionType(), Option.empty()); |
There was a problem hiding this comment.
🤖 Could this report the wrong action type for auto-committed replacecommit operations? hoodieTable.getMetaClient().getCommitActionType() returns commit/deltacommit (operation-agnostic), but SparkPartitionTTLActionExecutor goes through SparkAutoCommitExecutor and commits with REPLACE_COMMIT_ACTION — the callback would advertise commit/deltacommit for that path. Using CommitUtils.getCommitActionType(getOperationType(), table.getMetaClient().getTableType()) would match the actual committed action.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| heartbeatClient.stop(clusteringCommitTime); | ||
| } | ||
| log.info("Clustering successfully on commit {} for table {}", clusteringCommitTime, table.getConfig().getBasePath()); | ||
| fireCommitCallbackIfNecessary(clusteringCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION, |
There was a problem hiding this comment.
🤖 For table-version 8+ clustering uses HoodieTimeline.CLUSTERING_ACTION as the completed instant action (not REPLACE_COMMIT_ACTION) — the metrics line just above already uses CLUSTERING_ACTION. Could you read the action from clusteringInstant.getAction() so consumers see the same action that's actually on the timeline?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| HoodieTimer postCommitTimer = HoodieTimer.start(); | ||
| try { | ||
| postCommit(table, metadata, instantTime, extraMetadata); | ||
| postCommit(table, metadata, instantTime, commitActionType, extraMetadata); |
There was a problem hiding this comment.
🤖 Is the new ordering intentional? Pre-PR the callback fired after mayBeCleanAndArchive and runTableServicesInline (and was skipped if they threw with canIgnorePostCommitFailures=false). Now it fires inside postCommit, before them — so a consumer can receive a successful-commit callback even when post-commit cleanup throws and commitStats ultimately propagates the exception. The PR description mentions extra callbacks but doesn't call out this ordering shift.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| ); | ||
| } | ||
| log.info("Compacted successfully on commit {}", compactionCommitTime); | ||
| fireCommitCallbackIfNecessary(compactionCommitTime, HoodieTimeline.COMMIT_ACTION, |
There was a problem hiding this comment.
🤖 Worth noting: HoodieFlinkTableServiceClient overrides both completeCompaction and completeClustering and doesn't delegate to super or call fireCommitCallbackIfNecessary, so Flink users won't see the new table-service callback. Should the Flink overrides also fire the callback, or is the PR intentionally scoped to Spark/Java?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| /** | ||
| * Free-form context that producers can attach for downstream callback consumers. | ||
| * The OSS write client populates this as empty; specialized callsites or wrappers | ||
| * may populate it with whatever context their callbacks need. Mirrors the |
There was a problem hiding this comment.
🤖 nit: the phrase "Mirrors the optional shape of extraMetadata" is misleading — extraContext is Map<String, String>, not Option<Map<String, String>>, so the shapes are different. Could you drop or rephrase that sentence?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| */ | ||
| public static class PrevFilePaths implements Serializable { | ||
| private static final long serialVersionUID = 1L; | ||
| public final String prevBaseFilePath; |
There was a problem hiding this comment.
🤖 nit: the outer class uses Lombok @Getter for field access, but PrevFilePaths exposes bare public final fields — callers end up using two different access patterns on the same message object. Would it be worth adding @Getter here (and making the fields private) for consistency?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| continue; | ||
| } | ||
| String prevPath = prev.get().getPath(); | ||
| String bootstrapPath = prev.get().getBootstrapBaseFile().isPresent() |
There was a problem hiding this comment.
🤖 nit: prev.get() and getBootstrapBaseFile() are each called twice — could you cache them in locals, e.g. HoodieBaseFile prevFile = prev.get() and Option<BaseFile> bootstrapBase = prevFile.getBootstrapBaseFile()?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Describe the issue this Pull Request addresses
Today the write commit callback (
HoodieWriteCommitCallback) has two limitations that make it awkward for consumers that want to react to what a commit actually changed on storage:HoodieWriteCommitCallbackMessagecarries the write stats, but a consumer that wants to correlate each newly written base file with the existing base file (and bootstrap source) it superseded has to rebuild aFileSystemViewitself duplicating I/O the write client already paid for.This PR addresses both, backward-compatibly (no breaking change to the public API).
Summary and Changelog
What users gain: callback implementations now receive, per updated file group, the previous base file path (and bootstrap source path, if any) pre-resolved by the write client without rebuilding a file-system view. The callback also fires on compaction and clustering completions, not just data commits.
Changelog (all changes under
hudi-client/hudi-client-common/):HoodieWriteCommitCallbackMessage: added two optional fields:prevFilePaths:Map<fileId, PrevFilePaths>, wherePrevFilePathsholdsprevBaseFilePathandbootstrapBaseFilePath.extraContext:Map<String,String>for producer-attached context.Both default to empty maps (never null). The existing ctors are preserved; a new all-args ctor is generated via the existing lombok
@AllArgsConstructor.BaseHoodieClient: lifted thecommitCallbackfield up fromBaseHoodieWriteClient, and added two shared methods:fireCommitCallback(commitTime, commitActionType, stats, BaseFileOnlyView, extraMetadata)which lazily constructs the callback fromhoodie.write.commit.callback.classand invokes it.resolvePrevFilePaths(stats, BaseFileOnlyView)for each update stat, looks up the previous base file via the cached view (getBaseFileOn), capturing path + bootstrap path.BaseHoodieWriteClient: removed the inline callback block fromcommitStats; the callback now fires frompostCommit.postCommittakes the resolvedcommitActionTypeso the message reports the actual action (e.g.replacecommitforinsert_overwrite) rather than the table's base action type.BaseHoodieTableServiceClient: fires the callback after successful compaction (commit action) and clustering (replacecommitaction).TestBaseHoodieClient(new): Unit tests coveringresolvePrevFilePaths(inserts skipped, update resolution, bootstrap capture, missing-file skip, best-effort on view failure, null inputs) and the message default/retention contract.No code was copied from third-party sources.
Impact
HoodieWriteCommitCallbackMessageis@PublicAPIClass(EVOLVING). The change is additive and backward compatible i.e. existing ctors and getters are unchanged, and new fields default to empty maps. Existing callback implementations compile and run unchanged.Risk Level
low
Changes are confined to one module (hudi-client-common) and the callback path. All callback/resolution failures are caught and logged, so a misbehaving callback or a stale/remote view cannot fail a commit. Verified with the apache/hudi default build profile. Also:
TestBaseHoodieClientHoodieWriteCommitCallbackMessagector/getter call sites remain compatible.Documentation Update
none
Contributor's checklist