Skip to content

Commit 127d370

Browse files
committed
feat(client): enrich write commit callback message and fire it for table-service commits
Two backward-compatible improvements to the post-commit write callback mechanism: 1. Enrich HoodieWriteCommitCallbackMessage with two optional fields so callback implementations no longer have to rebuild a FileSystemView or reach into engine config: - prevFilePaths: Map<fileId, PrevFilePaths> -- the previous base file (and bootstrap source, if any) each updated file group replaces, pre-resolved by the write client from its cached file-system view. - extraContext: Map<String,String> -- free-form context producers can attach. Both default to empty maps; the existing 4-arg and 6-arg constructors are preserved. 2. Fire the callback for table-service commits too (compaction and clustering completion), not just data commits. The shared firing logic (fireCommitCallback) and prev-file resolution (resolvePrevFilePaths) are lifted into BaseHoodieClient so both BaseHoodieWriteClient (data commits, via postCommit) and BaseHoodieTableServiceClient (compaction/clustering completion) reuse them. The commitCallback field is lifted up from BaseHoodieWriteClient. postCommit now receives the resolved commit action type so the callback reports the actual action (e.g. replacecommit for insert_overwrite) rather than the table's base action type. Best-effort by design: callback and prev-file resolution failures are logged and never fail the write. Adds TestBaseHoodieClient covering resolvePrevFilePaths (inserts, updates, bootstrap capture, missing-file skip, best-effort on view failure, null inputs) and the message default/retention contract.
1 parent 97c03f7 commit 127d370

5 files changed

Lines changed: 321 additions & 17 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import lombok.Getter;
2727

2828
import java.io.Serializable;
29+
import java.util.Collections;
2930
import java.util.List;
3031
import java.util.Map;
3132

@@ -69,10 +70,54 @@ public class HoodieWriteCommitCallbackMessage implements Serializable {
6970
*/
7071
private final Option<Map<String, String>> extraMetadata;
7172

73+
/**
74+
* Previous base file paths keyed by fileId. Populated by the write client
75+
* using the cached FileSystemView so that callback implementations don't
76+
* have to rebuild a view. Empty for inserts and for callers that don't
77+
* pre-resolve.
78+
*/
79+
private final Map<String, PrevFilePaths> prevFilePaths;
80+
81+
/**
82+
* Free-form context that producers can attach for downstream callback consumers.
83+
* The OSS write client populates this as empty; specialized callsites or wrappers
84+
* may populate it with whatever context their callbacks need. Mirrors the
85+
* optional shape of {@link #extraMetadata}.
86+
*/
87+
private final Map<String, String> extraContext;
88+
7289
public HoodieWriteCommitCallbackMessage(String commitTime,
7390
String tableName,
7491
String basePath,
7592
List<HoodieWriteStat> hoodieWriteStat) {
76-
this(commitTime, tableName, basePath, hoodieWriteStat, Option.empty(), Option.empty());
93+
this(commitTime, tableName, basePath, hoodieWriteStat, Option.empty(), Option.empty(),
94+
Collections.emptyMap(), Collections.emptyMap());
95+
}
96+
97+
public HoodieWriteCommitCallbackMessage(String commitTime,
98+
String tableName,
99+
String basePath,
100+
List<HoodieWriteStat> hoodieWriteStat,
101+
Option<String> commitActionType,
102+
Option<Map<String, String>> extraMetadata) {
103+
this(commitTime, tableName, basePath, hoodieWriteStat, commitActionType, extraMetadata,
104+
Collections.emptyMap(), Collections.emptyMap());
105+
}
106+
107+
/**
108+
* Container for previously-existing file paths associated with a single fileId in a
109+
* commit. {@link #prevBaseFilePath} is the base file the new write replaces, and
110+
* {@link #bootstrapBaseFilePath} is the bootstrap-source file the previous
111+
* base file referenced (null for non-bootstrap tables).
112+
*/
113+
public static class PrevFilePaths implements Serializable {
114+
private static final long serialVersionUID = 1L;
115+
public final String prevBaseFilePath;
116+
public final String bootstrapBaseFilePath;
117+
118+
public PrevFilePaths(String prevBaseFilePath, String bootstrapBaseFilePath) {
119+
this.prevBaseFilePath = prevBaseFilePath;
120+
this.bootstrapBaseFilePath = bootstrapBaseFilePath;
121+
}
77122
}
78123
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,17 @@
2020

2121
import org.apache.hudi.avro.model.HoodieCleanMetadata;
2222
import org.apache.hudi.callback.HoodieClientInitCallback;
23+
import org.apache.hudi.callback.HoodieWriteCommitCallback;
24+
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
25+
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage.PrevFilePaths;
26+
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
2327
import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
2428
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
2529
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
2630
import org.apache.hudi.client.transaction.TransactionManager;
2731
import org.apache.hudi.client.utils.TransactionUtils;
2832
import org.apache.hudi.common.engine.HoodieEngineContext;
33+
import org.apache.hudi.common.model.HoodieBaseFile;
2934
import org.apache.hudi.common.model.HoodieCommitMetadata;
3035
import org.apache.hudi.common.model.HoodieWriteStat;
3136
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -34,6 +39,7 @@
3439
import org.apache.hudi.common.table.timeline.TimeGenerator;
3540
import org.apache.hudi.common.table.timeline.TimeGenerators;
3641
import org.apache.hudi.common.table.timeline.TimelineUtils;
42+
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
3743
import org.apache.hudi.common.util.Option;
3844
import org.apache.hudi.common.util.ReflectionUtils;
3945
import org.apache.hudi.common.util.StringUtils;
@@ -87,6 +93,14 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
8793
protected final TransactionManager txnManager;
8894
protected final TimeGenerator timeGenerator;
8995

96+
/**
97+
* Lazily-initialized commit callback (HoodieWriteCommitCallback). Lifted from
98+
* {@link BaseHoodieWriteClient} so that {@link BaseHoodieTableServiceClient} can also
99+
* fire callbacks for compaction and clustering completions. Transient is fine
100+
* because the callback is only ever invoked from the driver after a commit.
101+
*/
102+
protected transient HoodieWriteCommitCallback commitCallback;
103+
90104
/**
91105
* Timeline Server has the same lifetime as that of Client. Any operations done on the same timeline service will be
92106
* able to take advantage of the cached file-system view. New completed actions will be synced automatically in an
@@ -462,4 +476,78 @@ private static Map<String, String> collectRollingMetadataFromTimeline(
462476
protected Option<Map<String, String>> updateExtraMetadata(Option<Map<String, String>> extraMetadata) {
463477
return CommitMetadataProperties.enrich(extraMetadata, config, context);
464478
}
479+
480+
/**
481+
* Fire {@link HoodieWriteCommitCallback} for a commit, if enabled. Shared by
482+
* {@link BaseHoodieWriteClient#postCommit} (regular auto- and explicit-commit paths)
483+
* and {@link BaseHoodieTableServiceClient} (compaction and clustering completions).
484+
* Lazily constructs the callback instance from {@code hoodie.write.commit.callback.class}.
485+
*
486+
* <p>Best-effort: catches and logs any exception from the user-supplied callback so a
487+
* misbehaving observer cannot fail the commit.
488+
*/
489+
protected void fireCommitCallback(String commitTime,
490+
String commitActionType,
491+
List<HoodieWriteStat> stats,
492+
BaseFileOnlyView fsView,
493+
Option<Map<String, String>> extraMetadata) {
494+
if (!config.writeCommitCallbackOn()) {
495+
return;
496+
}
497+
try {
498+
if (commitCallback == null) {
499+
commitCallback = HoodieCommitCallbackFactory.create(config);
500+
}
501+
commitCallback.call(new HoodieWriteCommitCallbackMessage(
502+
commitTime, config.getTableName(), config.getBasePath(),
503+
stats, Option.of(commitActionType), extraMetadata, resolvePrevFilePaths(stats, fsView),
504+
Collections.emptyMap()));
505+
} catch (Exception e) {
506+
log.warn("HoodieWriteCommitCallback failed for commit {} ({}); ignoring",
507+
commitTime, commitActionType, e);
508+
}
509+
}
510+
511+
/**
512+
* Pre-resolve the previous base file (and bootstrap base file, if any) for every
513+
* {@link HoodieWriteStat} that represents an update, using a populated
514+
* {@link BaseFileOnlyView}. The lookup is O(1) per stat against the cached view, so
515+
* this adds no I/O on top of what the writer already paid.
516+
*
517+
* <p>Used by {@link #fireCommitCallback} call sites so the callback message ships
518+
* actual file paths rather than forcing each callback impl to rebuild a
519+
* {@code FileSystemView}.
520+
*/
521+
protected static Map<String, PrevFilePaths> resolvePrevFilePaths(List<HoodieWriteStat> stats,
522+
BaseFileOnlyView fsView) {
523+
Map<String, PrevFilePaths> out = new HashMap<>();
524+
if (stats == null || fsView == null) {
525+
return out;
526+
}
527+
for (HoodieWriteStat stat : stats) {
528+
String prevCommit = stat.getPrevCommit();
529+
if (StringUtils.isNullOrEmpty(prevCommit) || HoodieWriteStat.NULL_COMMIT.equals(prevCommit)) {
530+
continue;
531+
}
532+
Option<HoodieBaseFile> prev;
533+
try {
534+
prev = fsView.getBaseFileOn(stat.getPartitionPath(), prevCommit, stat.getFileId());
535+
} catch (Exception e) {
536+
// Best-effort: a remote view 4xx/5xx, a stale view, or a replaced file group must not
537+
// fail the commit. Drop the prev path for this stat and keep going.
538+
log.warn("Could not resolve prev base file for fileId={} prevCommit={}; skipping",
539+
stat.getFileId(), prevCommit, e);
540+
continue;
541+
}
542+
if (!prev.isPresent()) {
543+
continue;
544+
}
545+
String prevPath = prev.get().getPath();
546+
String bootstrapPath = prev.get().getBootstrapBaseFile().isPresent()
547+
? prev.get().getBootstrapBaseFile().get().getPath()
548+
: null;
549+
out.put(stat.getFileId(), new PrevFilePaths(prevPath, bootstrapPath));
550+
}
551+
return out;
552+
}
465553
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,8 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab
424424
);
425425
}
426426
log.info("Compacted successfully on commit {}", compactionCommitTime);
427+
fireCommitCallback(compactionCommitTime, HoodieTimeline.COMMIT_ACTION,
428+
writeStats, table.getBaseFileOnlyView(), Option.empty());
427429
} finally {
428430
if (config.getWriteConcurrencyMode().supportsMultiWriter()) {
429431
this.heartbeatClient.stop(compactionCommitTime);
@@ -640,6 +642,8 @@ private void completeClustering(HoodieReplaceCommitMetadata replaceCommitMetadat
640642
heartbeatClient.stop(clusteringCommitTime);
641643
}
642644
log.info("Clustering successfully on commit {} for table {}", clusteringCommitTime, table.getConfig().getBasePath());
645+
fireCommitCallback(clusteringCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION,
646+
writeStats, table.getBaseFileOnlyView(), Option.empty());
643647
}
644648

645649
protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@
2424
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
2525
import org.apache.hudi.avro.model.HoodieRestorePlan;
2626
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
27-
import org.apache.hudi.callback.HoodieWriteCommitCallback;
28-
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
2927
import org.apache.hudi.callback.common.WriteStatusValidator;
30-
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
3128
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
3229
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
3330
import org.apache.hudi.client.transaction.TransactionManager;
@@ -146,7 +143,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
146143
@Getter
147144
@Setter
148145
private transient WriteOperationType operationType;
149-
private transient HoodieWriteCommitCallback commitCallback;
150146

151147
protected transient Timer.Context writeTimer = null;
152148

@@ -287,7 +283,7 @@ public boolean commitStats(String instantTime, TableWriteStats tableWriteStats,
287283
boolean postCommitStatus = true;
288284
HoodieTimer postCommitTimer = HoodieTimer.start();
289285
try {
290-
postCommit(table, metadata, instantTime, extraMetadata);
286+
postCommit(table, metadata, instantTime, commitActionType, extraMetadata);
291287
mayBeCleanAndArchive(table);
292288
runTableServicesInline(table, metadata, extraMetadata);
293289
} catch (Exception e) {
@@ -303,15 +299,6 @@ public boolean commitStats(String instantTime, TableWriteStats tableWriteStats,
303299
}
304300

305301
emitCommitMetrics(instantTime, metadata, commitActionType);
306-
307-
// callback if needed.
308-
if (config.writeCommitCallbackOn()) {
309-
if (null == commitCallback) {
310-
commitCallback = HoodieCommitCallbackFactory.create(config);
311-
}
312-
commitCallback.call(new HoodieWriteCommitCallbackMessage(
313-
instantTime, config.getTableName(), config.getBasePath(), tableWriteStats.getDataTableWriteStats(), Option.of(commitActionType), extraMetadata));
314-
}
315302
return true;
316303
}
317304

@@ -639,7 +626,8 @@ public O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTabl
639626
boolean postCommitStatus = true;
640627
HoodieTimer postCommitTimer = HoodieTimer.start();
641628
try {
642-
postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());
629+
postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime,
630+
hoodieTable.getMetaClient().getCommitActionType(), Option.empty());
643631
mayBeCleanAndArchive(hoodieTable);
644632
} catch (Exception e) {
645633
postCommitStatus = false;
@@ -666,14 +654,20 @@ public O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTabl
666654
* @param instantTime Instant Time
667655
* @param extraMetadata Additional Metadata passed by user
668656
*/
669-
protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
657+
protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, String commitActionType, Option<Map<String, String>> extraMetadata) {
670658
try {
671659
context.setJobStatus(this.getClass().getSimpleName(), "Cleaning up marker directories for commit " + instantTime + " in table "
672660
+ config.getTableName());
673661
// Delete the marker directory for the instant.
674662
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
675663
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
676664
metrics.updateTableServiceInstantMetrics(table.getActiveTimeline());
665+
// Fire write commit callback if a callback class is registered. postCommit() is reached
666+
// by both auto-commit and explicit-commit paths; compaction and clustering have their own
667+
// explicit fireCommitCallback call sites in BaseHoodieTableServiceClient.
668+
List<HoodieWriteStat> stats = metadata.getWriteStats();
669+
fireCommitCallback(instantTime, commitActionType, stats,
670+
table.getBaseFileOnlyView(), extraMetadata);
677671
} finally {
678672
this.heartbeatClient.stop(instantTime);
679673
}

0 commit comments

Comments
 (0)