Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.Getter;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -69,10 +70,54 @@ public class HoodieWriteCommitCallbackMessage implements Serializable {
*/
private final Option<Map<String, String>> extraMetadata;

/**
* Previous base file paths keyed by fileId. Populated by the write client
* using the cached FileSystemView so that callback implementations don't
* have to rebuild a view. Empty for inserts and for callers that don't
* pre-resolve.
*/
private final Map<String, PrevFilePaths> prevFilePaths;

/**
* Free-form context that producers can attach for downstream callback consumers.
* The OSS write client populates this as empty; specialized callsites or wrappers
* may populate it with whatever context their callbacks need. Mirrors the

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the phrase "Mirrors the optional shape of extraMetadata" is misleading — extraContext is Map<String, String>, not Option<Map<String, String>>, so the shapes are different. Could you drop or rephrase that sentence?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

* optional shape of {@link #extraMetadata}.
*/
private final Map<String, String> extraContext;

public HoodieWriteCommitCallbackMessage(String commitTime,
String tableName,
String basePath,
List<HoodieWriteStat> hoodieWriteStat) {
this(commitTime, tableName, basePath, hoodieWriteStat, Option.empty(), Option.empty());
this(commitTime, tableName, basePath, hoodieWriteStat, Option.empty(), Option.empty(),
Collections.emptyMap(), Collections.emptyMap());
}

public HoodieWriteCommitCallbackMessage(String commitTime,
String tableName,
String basePath,
List<HoodieWriteStat> hoodieWriteStat,
Option<String> commitActionType,
Option<Map<String, String>> extraMetadata) {
this(commitTime, tableName, basePath, hoodieWriteStat, commitActionType, extraMetadata,
Collections.emptyMap(), Collections.emptyMap());
}

/**
* Container for previously-existing file paths associated with a single fileId in a
* commit. {@link #prevBaseFilePath} is the base file the new write replaces, and
* {@link #bootstrapBaseFilePath} is the bootstrap-source file the previous
* base file referenced (null for non-bootstrap tables).
*/
public static class PrevFilePaths implements Serializable {
private static final long serialVersionUID = 1L;
public final String prevBaseFilePath;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the outer class uses Lombok @Getter for field access, but PrevFilePaths exposes bare public final fields — callers end up using two different access patterns on the same message object. Would it be worth adding @Getter here (and making the fields private) for consistency?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

public final String bootstrapBaseFilePath;

public PrevFilePaths(String prevBaseFilePath, String bootstrapBaseFilePath) {
this.prevBaseFilePath = prevBaseFilePath;
this.bootstrapBaseFilePath = bootstrapBaseFilePath;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@

import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.callback.HoodieClientInitCallback;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage.PrevFilePaths;
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -34,6 +39,7 @@
import org.apache.hudi.common.table.timeline.TimeGenerator;
import org.apache.hudi.common.table.timeline.TimeGenerators;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -87,6 +93,14 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
protected final TransactionManager txnManager;
protected final TimeGenerator timeGenerator;

/**
* Lazily-initialized commit callback (HoodieWriteCommitCallback). Lifted from
* {@link BaseHoodieWriteClient} so that {@link BaseHoodieTableServiceClient} can also
* fire callbacks for compaction and clustering completions. Transient is fine
* because the callback is only ever invoked from the driver after a commit.
*/
protected transient HoodieWriteCommitCallback commitCallback;

/**
* Timeline Server has the same lifetime as that of Client. Any operations done on the same timeline service will be
* able to take advantage of the cached file-system view. New completed actions will be synced automatically in an
Expand Down Expand Up @@ -462,4 +476,78 @@ private static Map<String, String> collectRollingMetadataFromTimeline(
protected Option<Map<String, String>> updateExtraMetadata(Option<Map<String, String>> extraMetadata) {
return CommitMetadataProperties.enrich(extraMetadata, config, context);
}

/**
* Fire {@link HoodieWriteCommitCallback} for a commit, if enabled. Shared by
* {@link BaseHoodieWriteClient#postCommit} (regular auto- and explicit-commit paths)
* and {@link BaseHoodieTableServiceClient} (compaction and clustering completions).
* Lazily constructs the callback instance from {@code hoodie.write.commit.callback.class}.
*
* <p>Best-effort: catches and logs any exception from the user-supplied callback so a
* misbehaving observer cannot fail the commit.
*/
protected void fireCommitCallback(String commitTime,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mayFireCommitCallback or fireCommitCallbackIfNecessary

String commitActionType,
List<HoodieWriteStat> stats,
BaseFileOnlyView fsView,
Option<Map<String, String>> extraMetadata) {
if (!config.writeCommitCallbackOn()) {
return;
}
try {
if (commitCallback == null) {
commitCallback = HoodieCommitCallbackFactory.create(config);
}
commitCallback.call(new HoodieWriteCommitCallbackMessage(
commitTime, config.getTableName(), config.getBasePath(),
stats, Option.of(commitActionType), extraMetadata, resolvePrevFilePaths(stats, fsView),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure the usage of the prev file paths, is it for debugging?

@codope codope Jun 13, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was needed for an internal feature.. it lets callback consumers (e.g. testing frameworks, lineage/audit sinks) see which base file each update replaced without rebuilding a FileSystemView themselves.
cc @prashantwason

Collections.emptyMap()));
} catch (Exception e) {
log.warn("HoodieWriteCommitCallback failed for commit {} ({}); ignoring",
commitTime, commitActionType, e);
}
}

/**
* Pre-resolve the previous base file (and bootstrap base file, if any) for every
* {@link HoodieWriteStat} that represents an update, using a populated
* {@link BaseFileOnlyView}. The lookup is O(1) per stat against the cached view, so
* this adds no I/O on top of what the writer already paid.
*
* <p>Used by {@link #fireCommitCallback} call sites so the callback message ships
* actual file paths rather than forcing each callback impl to rebuild a
* {@code FileSystemView}.
*/
protected static Map<String, PrevFilePaths> resolvePrevFilePaths(List<HoodieWriteStat> stats,
BaseFileOnlyView fsView) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query the fsview is costly, can we make it a supplier so that it is only called when necessary.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switched to a Supplier so the view is only resolved after the callback-enabled check, keeping the default (callback-off) path free. Even before, on the enabled path, it reuses the same cached view the writer already populated, so no extra I/O.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I'm saying is make the whole resolvePrevFilePaths as a supplier, so that even if the call back is invoked but no access to the prev files, the fs view access would be not triggered.

Map<String, PrevFilePaths> out = new HashMap<>();
if (stats == null || fsView == null) {
return out;
}
for (HoodieWriteStat stat : stats) {
String prevCommit = stat.getPrevCommit();
if (StringUtils.isNullOrEmpty(prevCommit) || HoodieWriteStat.NULL_COMMIT.equals(prevCommit)) {
continue;
}
Option<HoodieBaseFile> prev;
try {
prev = fsView.getBaseFileOn(stat.getPartitionPath(), prevCommit, stat.getFileId());
} catch (Exception e) {
// Best-effort: a remote view 4xx/5xx, a stale view, or a replaced file group must not
// fail the commit. Drop the prev path for this stat and keep going.
log.warn("Could not resolve prev base file for fileId={} prevCommit={}; skipping",
stat.getFileId(), prevCommit, e);
continue;
}
if (!prev.isPresent()) {
continue;
}
String prevPath = prev.get().getPath();
String bootstrapPath = prev.get().getBootstrapBaseFile().isPresent()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: prev.get() and getBootstrapBaseFile() are each called twice — could you cache them in locals, e.g. HoodieBaseFile prevFile = prev.get() and Option<BaseFile> bootstrapBase = prevFile.getBootstrapBaseFile()?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

? prev.get().getBootstrapBaseFile().get().getPath()
: null;
out.put(stat.getFileId(), new PrevFilePaths(prevPath, bootstrapPath));
}
return out;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,8 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab
);
}
log.info("Compacted successfully on commit {}", compactionCommitTime);
fireCommitCallback(compactionCommitTime, HoodieTimeline.COMMIT_ACTION,
writeStats, table.getBaseFileOnlyView(), Option.empty());
} finally {
if (config.getWriteConcurrencyMode().supportsMultiWriter()) {
this.heartbeatClient.stop(compactionCommitTime);
Expand Down Expand Up @@ -640,6 +642,8 @@ private void completeClustering(HoodieReplaceCommitMetadata replaceCommitMetadat
heartbeatClient.stop(clusteringCommitTime);
}
log.info("Clustering successfully on commit {} for table {}", clusteringCommitTime, table.getConfig().getBasePath());
fireCommitCallback(clusteringCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION,
writeStats, table.getBaseFileOnlyView(), Option.empty());
}

protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.common.WriteStatusValidator;
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.client.transaction.TransactionManager;
Expand Down Expand Up @@ -146,7 +143,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
@Getter
@Setter
private transient WriteOperationType operationType;
private transient HoodieWriteCommitCallback commitCallback;

protected transient Timer.Context writeTimer = null;

Expand Down Expand Up @@ -287,7 +283,7 @@ public boolean commitStats(String instantTime, TableWriteStats tableWriteStats,
boolean postCommitStatus = true;
HoodieTimer postCommitTimer = HoodieTimer.start();
try {
postCommit(table, metadata, instantTime, extraMetadata);
postCommit(table, metadata, instantTime, commitActionType, extraMetadata);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Is the new ordering intentional? Pre-PR the callback fired after mayBeCleanAndArchive and runTableServicesInline (and was skipped if they threw with canIgnorePostCommitFailures=false). Now it fires inside postCommit, before them — so a consumer can receive a successful-commit callback even when post-commit cleanup throws and commitStats ultimately propagates the exception. The PR description mentions extra callbacks but doesn't call out this ordering shift.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

mayBeCleanAndArchive(table);
runTableServicesInline(table, metadata, extraMetadata);
} catch (Exception e) {
Expand All @@ -303,15 +299,6 @@ public boolean commitStats(String instantTime, TableWriteStats tableWriteStats,
}

emitCommitMetrics(instantTime, metadata, commitActionType);

// callback if needed.
if (config.writeCommitCallbackOn()) {
if (null == commitCallback) {
commitCallback = HoodieCommitCallbackFactory.create(config);
}
commitCallback.call(new HoodieWriteCommitCallbackMessage(
instantTime, config.getTableName(), config.getBasePath(), tableWriteStats.getDataTableWriteStats(), Option.of(commitActionType), extraMetadata));
}
return true;
}

Expand Down Expand Up @@ -639,7 +626,8 @@ public O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTabl
boolean postCommitStatus = true;
HoodieTimer postCommitTimer = HoodieTimer.start();
try {
postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());
postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime,
hoodieTable.getMetaClient().getCommitActionType(), Option.empty());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Could this report the wrong action type for auto-committed replacecommit operations? hoodieTable.getMetaClient().getCommitActionType() returns commit/deltacommit (operation-agnostic), but SparkPartitionTTLActionExecutor goes through SparkAutoCommitExecutor and commits with REPLACE_COMMIT_ACTION — the callback would advertise commit/deltacommit for that path. Using CommitUtils.getCommitActionType(getOperationType(), table.getMetaClient().getTableType()) would match the actual committed action.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

mayBeCleanAndArchive(hoodieTable);
} catch (Exception e) {
postCommitStatus = false;
Expand All @@ -666,14 +654,20 @@ public O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTabl
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
*/
protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, String commitActionType, Option<Map<String, String>> extraMetadata) {
try {
context.setJobStatus(this.getClass().getSimpleName(), "Cleaning up marker directories for commit " + instantTime + " in table "
+ config.getTableName());
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
metrics.updateTableServiceInstantMetrics(table.getActiveTimeline());
// Fire write commit callback if a callback class is registered. postCommit() is reached
// by both auto-commit and explicit-commit paths; compaction and clustering have their own
// explicit fireCommitCallback call sites in BaseHoodieTableServiceClient.
List<HoodieWriteStat> stats = metadata.getWriteStats();
fireCommitCallback(instantTime, commitActionType, stats,
table.getBaseFileOnlyView(), extraMetadata);
} finally {
this.heartbeatClient.stop(instantTime);
}
Expand Down
Loading
Loading