Skip to content

Commit 06ccec1

Browse files
authored
Scheduled reducer: use timestamp from reducer params for next run (#3657)
# Description of Changes fixes: #2882 Schedule repeated reducers from their last execution timestamp instead of `Timestamp::now` # API and ABI breaking changes NA # Expected complexity level and risk 1 # Testing Existing test should be enough to cover any regression.
1 parent 8787fbe commit 06ccec1

File tree

2 files changed

+14
-7
lines changed

2 files changed

+14
-7
lines changed

crates/core/src/host/module_host.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,7 +1491,7 @@ impl ModuleHost {
14911491
pub async fn call_scheduled_reducer(
14921492
&self,
14931493
call_reducer_params: impl FnOnce(&MutTxId) -> anyhow::Result<Option<CallReducerParams>> + Send + 'static,
1494-
) -> Result<ReducerCallResult, ReducerCallError> {
1494+
) -> Result<(ReducerCallResult, Timestamp), ReducerCallError> {
14951495
let db = self.module.replica_ctx().relational_db.clone();
14961496
// scheduled reducer name not fetched yet, anyway this is only for logging purpose
14971497
const REDUCER: &str = "scheduled_reducer";
@@ -1518,8 +1518,9 @@ impl ModuleHost {
15181518
arg_bsatn: params.args.get_bsatn().clone(),
15191519
}),
15201520
);
1521+
let timestamp = params.timestamp;
15211522

1522-
Ok(inst.call_reducer(Some(tx), params))
1523+
Ok((inst.call_reducer(Some(tx), params), timestamp))
15231524
}
15241525
Ok(None) => Err(ReducerCallError::ScheduleReducerNotFound),
15251526
Err(err) => Err(ReducerCallError::Args(InvalidReducerArguments(

crates/core/src/host/scheduler.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -396,11 +396,19 @@ impl SchedulerActor {
396396
// the ScheduledReducer in the database for when the module restarts
397397
Ok(Err(ReducerCallError::NoSuchModule(_)) | Err(ReducerCallError::ScheduleReducerNotFound)) => {}
398398

399+
Ok(Ok((_, ts))) => {
400+
if let Some(id) = id {
401+
let _ = self.delete_scheduled_reducer_row(&db, id, module_host_clone, ts).await;
402+
}
403+
}
404+
399405
// delete the scheduled reducer row if its not repeated reducer
400406
Ok(_) | Err(_) => {
401407
if let Some(id) = id {
402408
// TODO: Handle errors here?
403-
let _ = self.delete_scheduled_reducer_row(&db, id, module_host_clone).await;
409+
let _ = self
410+
.delete_scheduled_reducer_row(&db, id, module_host_clone, Timestamp::now())
411+
.await;
404412
}
405413
}
406414
}
@@ -415,6 +423,7 @@ impl SchedulerActor {
415423
db: &RelationalDB,
416424
id: ScheduledReducerId,
417425
module_host: ModuleHost,
426+
ts: Timestamp,
418427
) -> anyhow::Result<()> {
419428
let host_clone = module_host.clone();
420429
let db = db.clone();
@@ -455,10 +464,7 @@ impl SchedulerActor {
455464
// If this was repeated, we need to add it back to the queue.
456465
if let Some(ScheduleAt::Interval(dur)) = schedule_at {
457466
let key = self.queue.insert(
458-
QueueItem::Id {
459-
id,
460-
at: Timestamp::now() + dur,
461-
},
467+
QueueItem::Id { id, at: ts + dur },
462468
dur.to_duration().unwrap_or(Duration::ZERO),
463469
);
464470
self.key_map.insert(id, key);

0 commit comments

Comments
 (0)