-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat(client): enrich write commit callback message and fire it for table-service commits #18988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ | |
| import lombok.Getter; | ||
|
|
||
| import java.io.Serializable; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
|
|
@@ -69,10 +70,54 @@ public class HoodieWriteCommitCallbackMessage implements Serializable { | |
| */ | ||
| private final Option<Map<String, String>> extraMetadata; | ||
|
|
||
| /** | ||
| * Previous base file paths keyed by fileId. Populated by the write client | ||
| * using the cached FileSystemView so that callback implementations don't | ||
| * have to rebuild a view. Empty for inserts and for callers that don't | ||
| * pre-resolve. | ||
| */ | ||
| private final Map<String, PrevFilePaths> prevFilePaths; | ||
|
|
||
| /** | ||
| * Free-form context that producers can attach for downstream callback consumers. | ||
| * The OSS write client populates this as empty; specialized callsites or wrappers | ||
| * may populate it with whatever context their callbacks need. Mirrors the | ||
| * optional shape of {@link #extraMetadata}. | ||
| */ | ||
| private final Map<String, String> extraContext; | ||
|
|
||
| public HoodieWriteCommitCallbackMessage(String commitTime, | ||
| String tableName, | ||
| String basePath, | ||
| List<HoodieWriteStat> hoodieWriteStat) { | ||
| this(commitTime, tableName, basePath, hoodieWriteStat, Option.empty(), Option.empty()); | ||
| this(commitTime, tableName, basePath, hoodieWriteStat, Option.empty(), Option.empty(), | ||
| Collections.emptyMap(), Collections.emptyMap()); | ||
| } | ||
|
|
||
| public HoodieWriteCommitCallbackMessage(String commitTime, | ||
| String tableName, | ||
| String basePath, | ||
| List<HoodieWriteStat> hoodieWriteStat, | ||
| Option<String> commitActionType, | ||
| Option<Map<String, String>> extraMetadata) { | ||
| this(commitTime, tableName, basePath, hoodieWriteStat, commitActionType, extraMetadata, | ||
| Collections.emptyMap(), Collections.emptyMap()); | ||
| } | ||
|
|
||
| /** | ||
| * Container for previously-existing file paths associated with a single fileId in a | ||
| * commit. {@link #prevBaseFilePath} is the base file the new write replaces, and | ||
| * {@link #bootstrapBaseFilePath} is the bootstrap-source file the previous | ||
| * base file referenced (null for non-bootstrap tables). | ||
| */ | ||
| public static class PrevFilePaths implements Serializable { | ||
| private static final long serialVersionUID = 1L; | ||
| public final String prevBaseFilePath; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 nit: the outer class uses Lombok - AI-generated; verify before applying. React 👍/👎 to flag quality. |
||
| public final String bootstrapBaseFilePath; | ||
|
|
||
| public PrevFilePaths(String prevBaseFilePath, String bootstrapBaseFilePath) { | ||
| this.prevBaseFilePath = prevBaseFilePath; | ||
| this.bootstrapBaseFilePath = bootstrapBaseFilePath; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,12 +20,17 @@ | |
|
|
||
| import org.apache.hudi.avro.model.HoodieCleanMetadata; | ||
| import org.apache.hudi.callback.HoodieClientInitCallback; | ||
| import org.apache.hudi.callback.HoodieWriteCommitCallback; | ||
| import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; | ||
| import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage.PrevFilePaths; | ||
| import org.apache.hudi.callback.util.HoodieCommitCallbackFactory; | ||
| import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper; | ||
| import org.apache.hudi.client.embedded.EmbeddedTimelineService; | ||
| import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient; | ||
| import org.apache.hudi.client.transaction.TransactionManager; | ||
| import org.apache.hudi.client.utils.TransactionUtils; | ||
| import org.apache.hudi.common.engine.HoodieEngineContext; | ||
| import org.apache.hudi.common.model.HoodieBaseFile; | ||
| import org.apache.hudi.common.model.HoodieCommitMetadata; | ||
| import org.apache.hudi.common.model.HoodieWriteStat; | ||
| import org.apache.hudi.common.table.HoodieTableMetaClient; | ||
|
|
@@ -34,6 +39,7 @@ | |
| import org.apache.hudi.common.table.timeline.TimeGenerator; | ||
| import org.apache.hudi.common.table.timeline.TimeGenerators; | ||
| import org.apache.hudi.common.table.timeline.TimelineUtils; | ||
| import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; | ||
| import org.apache.hudi.common.util.Option; | ||
| import org.apache.hudi.common.util.ReflectionUtils; | ||
| import org.apache.hudi.common.util.StringUtils; | ||
|
|
@@ -63,6 +69,7 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.function.Supplier; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.apache.hudi.config.HoodieWriteConfig.APPLICATION_ID; | ||
|
|
@@ -87,6 +94,14 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable { | |
| protected final TransactionManager txnManager; | ||
| protected final TimeGenerator timeGenerator; | ||
|
|
||
| /** | ||
| * Lazily-initialized commit callback (HoodieWriteCommitCallback). Lifted from | ||
| * {@link BaseHoodieWriteClient} so that {@link BaseHoodieTableServiceClient} can also | ||
| * fire callbacks for compaction and clustering completions. Transient is fine | ||
| * because the callback is only ever invoked from the driver after a commit. | ||
| */ | ||
| protected transient HoodieWriteCommitCallback commitCallback; | ||
|
|
||
| /** | ||
| * Timeline Server has the same lifetime as that of Client. Any operations done on the same timeline service will be | ||
| * able to take advantage of the cached file-system view. New completed actions will be synced automatically in an | ||
|
|
@@ -462,4 +477,79 @@ private static Map<String, String> collectRollingMetadataFromTimeline( | |
| protected Option<Map<String, String>> updateExtraMetadata(Option<Map<String, String>> extraMetadata) { | ||
| return CommitMetadataProperties.enrich(extraMetadata, config, context); | ||
| } | ||
|
|
||
| /** | ||
| * Fire {@link HoodieWriteCommitCallback} for a commit, if enabled. Shared by | ||
| * {@link BaseHoodieWriteClient#postCommit} (regular auto- and explicit-commit paths) | ||
| * and {@link BaseHoodieTableServiceClient} (compaction and clustering completions). | ||
| * Lazily constructs the callback instance from {@code hoodie.write.commit.callback.class}. | ||
| * | ||
| * <p>Best-effort: catches and logs any exception from the user-supplied callback so a | ||
| * misbehaving observer cannot fail the commit. | ||
| */ | ||
| protected void fireCommitCallbackIfNecessary(String commitTime, | ||
| String commitActionType, | ||
| List<HoodieWriteStat> stats, | ||
| Supplier<BaseFileOnlyView> fsViewSupplier, | ||
| Option<Map<String, String>> extraMetadata) { | ||
| if (!config.writeCommitCallbackOn()) { | ||
| return; | ||
| } | ||
| try { | ||
| if (commitCallback == null) { | ||
| commitCallback = HoodieCommitCallbackFactory.create(config); | ||
| } | ||
| commitCallback.call(new HoodieWriteCommitCallbackMessage( | ||
| commitTime, config.getTableName(), config.getBasePath(), | ||
| stats, Option.of(commitActionType), extraMetadata, | ||
| resolvePrevFilePaths(stats, fsViewSupplier.get()), | ||
| Collections.emptyMap())); | ||
| } catch (Exception e) { | ||
| log.warn("HoodieWriteCommitCallback failed for commit {} ({}); ignoring", | ||
| commitTime, commitActionType, e); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Pre-resolve the previous base file (and bootstrap base file, if any) for every | ||
| * {@link HoodieWriteStat} that represents an update, using a populated | ||
| * {@link BaseFileOnlyView}. The lookup is O(1) per stat against the cached view, so | ||
| * this adds no I/O on top of what the writer already paid. | ||
| * | ||
| * <p>Used by {@link #fireCommitCallbackIfNecessary} call sites so the callback message ships | ||
| * actual file paths rather than forcing each callback impl to rebuild a | ||
| * {@code FileSystemView}. | ||
| */ | ||
| protected static Map<String, PrevFilePaths> resolvePrevFilePaths(List<HoodieWriteStat> stats, | ||
| BaseFileOnlyView fsView) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. query the fsview is costly, can we make it a supplier so that it is only called when necessary.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. switched to a Supplier so the view is only resolved after the callback-enabled check, keeping the default (callback-off) path free. Even before, on the enabled path, it reuses the same cached view the writer already populated, so no extra I/O.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what I'm saying is make the whole |
||
| Map<String, PrevFilePaths> out = new HashMap<>(); | ||
| if (stats == null || fsView == null) { | ||
| return out; | ||
| } | ||
| for (HoodieWriteStat stat : stats) { | ||
| String prevCommit = stat.getPrevCommit(); | ||
| if (StringUtils.isNullOrEmpty(prevCommit) || HoodieWriteStat.NULL_COMMIT.equals(prevCommit)) { | ||
| continue; | ||
| } | ||
| Option<HoodieBaseFile> prev; | ||
| try { | ||
| prev = fsView.getBaseFileOn(stat.getPartitionPath(), prevCommit, stat.getFileId()); | ||
| } catch (Exception e) { | ||
| // Best-effort: a remote view 4xx/5xx, a stale view, or a replaced file group must not | ||
| // fail the commit. Drop the prev path for this stat and keep going. | ||
| log.warn("Could not resolve prev base file for fileId={} prevCommit={}; skipping", | ||
| stat.getFileId(), prevCommit, e); | ||
| continue; | ||
| } | ||
| if (!prev.isPresent()) { | ||
| continue; | ||
| } | ||
| String prevPath = prev.get().getPath(); | ||
| String bootstrapPath = prev.get().getBootstrapBaseFile().isPresent() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 nit: - AI-generated; verify before applying. React 👍/👎 to flag quality. |
||
| ? prev.get().getBootstrapBaseFile().get().getPath() | ||
| : null; | ||
| out.put(stat.getFileId(), new PrevFilePaths(prevPath, bootstrapPath)); | ||
| } | ||
| return out; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -424,6 +424,8 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab | |
| ); | ||
| } | ||
| log.info("Compacted successfully on commit {}", compactionCommitTime); | ||
| fireCommitCallbackIfNecessary(compactionCommitTime, HoodieTimeline.COMMIT_ACTION, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. completeLogCompaction (LOG_COMPACTION_ACTION) is also a table-service completion with write stats and a timeline commit, but it does not call fireCommitCallbackIfNecessary while compaction and clustering do. Intentional scope, or should log compaction fire the callback too? If intentional, a short note here on why it is excluded would keep the set explicit.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 Worth noting: - AI-generated; verify before applying. React 👍/👎 to flag quality. |
||
| writeStats, table::getBaseFileOnlyView, Option.empty()); | ||
| } finally { | ||
| if (config.getWriteConcurrencyMode().supportsMultiWriter()) { | ||
| this.heartbeatClient.stop(compactionCommitTime); | ||
|
|
@@ -640,6 +642,8 @@ private void completeClustering(HoodieReplaceCommitMetadata replaceCommitMetadat | |
| heartbeatClient.stop(clusteringCommitTime); | ||
| } | ||
| log.info("Clustering successfully on commit {} for table {}", clusteringCommitTime, table.getConfig().getBasePath()); | ||
| fireCommitCallbackIfNecessary(clusteringCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 For table-version 8+ clustering uses - AI-generated; verify before applying. React 👍/👎 to flag quality. |
||
| writeStats, table::getBaseFileOnlyView, Option.empty()); | ||
| } | ||
|
|
||
| protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,10 +24,7 @@ | |
| import org.apache.hudi.avro.model.HoodieRestoreMetadata; | ||
| import org.apache.hudi.avro.model.HoodieRestorePlan; | ||
| import org.apache.hudi.avro.model.HoodieRollbackMetadata; | ||
| import org.apache.hudi.callback.HoodieWriteCommitCallback; | ||
| import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; | ||
| import org.apache.hudi.callback.common.WriteStatusValidator; | ||
| import org.apache.hudi.callback.util.HoodieCommitCallbackFactory; | ||
| import org.apache.hudi.client.embedded.EmbeddedTimelineService; | ||
| import org.apache.hudi.client.heartbeat.HeartbeatUtils; | ||
| import org.apache.hudi.client.transaction.TransactionManager; | ||
|
|
@@ -146,7 +143,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient | |
| @Getter | ||
| @Setter | ||
| private transient WriteOperationType operationType; | ||
| private transient HoodieWriteCommitCallback commitCallback; | ||
|
|
||
| protected transient Timer.Context writeTimer = null; | ||
|
|
||
|
|
@@ -287,7 +283,7 @@ public boolean commitStats(String instantTime, TableWriteStats tableWriteStats, | |
| boolean postCommitStatus = true; | ||
| HoodieTimer postCommitTimer = HoodieTimer.start(); | ||
| try { | ||
| postCommit(table, metadata, instantTime, extraMetadata); | ||
| postCommit(table, metadata, instantTime, commitActionType, extraMetadata); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 Is the new ordering intentional? Pre-PR the callback fired after - AI-generated; verify before applying. React 👍/👎 to flag quality. |
||
| mayBeCleanAndArchive(table); | ||
| runTableServicesInline(table, metadata, extraMetadata); | ||
| } catch (Exception e) { | ||
|
|
@@ -303,15 +299,6 @@ public boolean commitStats(String instantTime, TableWriteStats tableWriteStats, | |
| } | ||
|
|
||
| emitCommitMetrics(instantTime, metadata, commitActionType); | ||
|
|
||
| // callback if needed. | ||
| if (config.writeCommitCallbackOn()) { | ||
| if (null == commitCallback) { | ||
| commitCallback = HoodieCommitCallbackFactory.create(config); | ||
| } | ||
| commitCallback.call(new HoodieWriteCommitCallbackMessage( | ||
| instantTime, config.getTableName(), config.getBasePath(), tableWriteStats.getDataTableWriteStats(), Option.of(commitActionType), extraMetadata)); | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
|
|
@@ -639,7 +626,8 @@ public O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTabl | |
| boolean postCommitStatus = true; | ||
| HoodieTimer postCommitTimer = HoodieTimer.start(); | ||
| try { | ||
| postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty()); | ||
| postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, | ||
| hoodieTable.getMetaClient().getCommitActionType(), Option.empty()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 Could this report the wrong action type for auto-committed replacecommit operations? - AI-generated; verify before applying. React 👍/👎 to flag quality. |
||
| mayBeCleanAndArchive(hoodieTable); | ||
| } catch (Exception e) { | ||
| postCommitStatus = false; | ||
|
|
@@ -666,14 +654,20 @@ public O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTabl | |
| * @param instantTime Instant Time | ||
| * @param extraMetadata Additional Metadata passed by user | ||
| */ | ||
| protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) { | ||
| protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, String commitActionType, Option<Map<String, String>> extraMetadata) { | ||
| try { | ||
| context.setJobStatus(this.getClass().getSimpleName(), "Cleaning up marker directories for commit " + instantTime + " in table " | ||
| + config.getTableName()); | ||
| // Delete the marker directory for the instant. | ||
| WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) | ||
| .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); | ||
| metrics.updateTableServiceInstantMetrics(table.getActiveTimeline()); | ||
| // Fire write commit callback if a callback class is registered. postCommit() is reached | ||
| // by both auto-commit and explicit-commit paths; compaction and clustering have their own | ||
| // explicit fireCommitCallbackIfNecessary call sites in BaseHoodieTableServiceClient. | ||
| List<HoodieWriteStat> stats = metadata.getWriteStats(); | ||
| fireCommitCallbackIfNecessary(instantTime, commitActionType, stats, | ||
| table::getBaseFileOnlyView, extraMetadata); | ||
| } finally { | ||
| this.heartbeatClient.stop(instantTime); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤖 nit: the phrase "Mirrors the optional shape of
extraMetadata" is misleading —extraContextisMap<String, String>, notOption<Map<String, String>>, so the shapes are different. Could you drop or rephrase that sentence?- AI-generated; verify before applying. React 👍/👎 to flag quality.