Defer slot migration failover until async modules are consistent #2609
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
Problem: Modules that perform asynchronous processing (e.g., Valkey-search) can experience "read-your-own-write" consistency regressions during atomic slot migrations. The engine currently has no awareness of module-specific processing backlogs and can transfer slot ownership before an async module on the target node has fully processed all incoming writes, leading to data inconsistencies.
Solution: This PR introduces a synchronization mechanism that makes slot migration "module-aware." It defers the final
REQUEST-FAILOVER
step until all registered asynchronous modules on the target node certify they are fully consistent with the data state at the moment the source node pauses.This is achieved through three core changes:
1. New Module APIs
Three new module APIs are introduced to allow the engine to query the state of a module's asynchronous processing pipeline:
VM_RegisterAsyncProcessingStatusFunc(ValkeyModuleCtx *ctx, ValkeyModuleAsyncProcessingStatusFunc callback)
Modules register this callback (at most one) for the engine to poll their status. Within this callback, modules must use the following two reporting functions.
VM_ReportAsyncProcessingOffset(ValkeyModuleAsyncProcessingStatusCtx *ctx, uint64_t accepted_offset, uint64_t applied_offset)
Reports the module's "point-in-time" data consistency. Modules define their own offset scheme. This allows the engine to know when
applied
(fully processed) data has caught up to the lastaccepted
(queued) data at a specific moment (i.e., the pause).We don't have a canonical offset that we can use, since the engine's replication offset is only tracked when we are doing replication. Hence why modules are left to devise their own offset scheme.
VM_ReportAsyncProcessingLag(ValkeyModuleAsyncProcessingStatusCtx *ctx, long long current_lag_millis)
Reports an estimate (in milliseconds) of the module's current processing backlog. This is used by the source node to calculate the total end-to-end lag before initiating a pause.
2. Migration Change: Pre-Pause Lag Probing (Source Node)
This change replaces the source node's reliance on its output buffer size as the trigger for pausing. The output buffer is blind to the module-processing backlog on the target, which could lead to pause timeouts.
We now use an active lag-probing system based on two new
SYNCSLOTS
commands:SYNCSLOTS GET-LAG <mstime>
: Sent from source to target to request a lag report.SYNCSLOTS LAG <millis>
: Sent from target to source with the total calculated lag.Flow:
SYNCSLOTS GET-LAG
.<mstime>
), and then polls its local async modules (via the new APIs) to get the maximummodule_lag_millis
.SYNCSLOTS LAG <total_ms>
, wheretotal_ms
is the sum of network and module lag.total_ms
drops below a configurable threshold (default: 1 second). This ensures the target is reasonably "ready" for the final synchronization.3. Migration Change: Post-Pause Module Wait (Target Node)
This is the core consistency guarantee. Simply having low lag before the pause is not enough; we must ensure modules on the target process every write that occurred before the pause.
However, the target node may still be processing writes for other, non-importing slots, so we cannot simply "wait for all activity to stop." Instead, we use the new offset APIs to create a precise synchronization barrier:
current accepted_offset
. This value creates a "checkpoint" representing the exact state of each module's queue at the moment of the pause.applied_offset
that is equal to (or greater than) their recordedaccepted_offset
checkpoint does the target proceed.SYNCSLOTS REQUEST-FAILOVER
to finalize the migration.New Sequence Diagram