feat: add dynamic schedule step control (pause/resume/list cron jobs)#1257
feat: add dynamic schedule step control (pause/resume/list cron jobs)#1257raj-kochale wants to merge 1 commit intoiii-hq:mainfrom
Conversation
|
@raj-kochale is attempting to deploy a commit to the motia Team on Vercel. A member of the Team first needs to authorize it. |
📝 WalkthroughWalkthroughThis PR adds runtime control for cron jobs (pause, resume, list) and delayed message enqueuing for queues. The cron module exposes control APIs with atomic state management, queues support optional delay_ms with 15-minute caps, and Motia framework exposes these via FlowContext. Includes minor code improvements and documentation. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
engine/src/modules/queue/queue.rs (1)
91-110: Spawned task is fire-and-forget without completion tracking.The delayed enqueue spawns a detached task. While this is acceptable for non-critical background work, be aware that:
- If the process shuts down during the sleep period, the delayed message is lost
- There's no way to cancel a pending delayed enqueue
For queues requiring stronger delivery guarantees, consider persisting delayed messages with a scheduler pattern instead. This may be acceptable for the current use case.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/modules/queue/queue.rs` around lines 91 - 110, The delayed enqueue uses a fire-and-forget tokio::spawn inside the conditional in queue.rs (the block that calls adapter.enqueue, tokio::spawn, and crate::modules::telemetry::collector::track_queue_emit), which means delayed tasks cannot be tracked, canceled, or guaranteed on shutdown; fix by either persisting the delayed message to a durable store and scheduling it via a background scheduler (so enqueue is replayed reliably on restart) or by returning/managing a JoinHandle/JoinSet from the spawn and wiring it into the process shutdown/cancellation logic so pending sleeps can be awaited or canceled; update the code paths around the tokio::spawn, adapter.enqueue call, and where tasks are created so delayed messages are either persisted or registered with a shutdown-aware task manager.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@engine/src/modules/queue/queue.rs`:
- Around line 101-107: The spawned task currently swallows errors from
adapter.enqueue() (let _ = ...), so change it to await the Result and log
failures instead of discarding them: capture the result of
adapter.enqueue(&topic, event_data, traceparent, baggage).await (e.g., if let
Err(e) = ... or match ...) and call a logger (tracing::error! or log::error!)
including the error and contextual info (topic, maybe delay) so enqueue failures
are visible; keep the call to
crate::modules::telemetry::collector::track_queue_emit() after handling the
result.
---
Nitpick comments:
In `@engine/src/modules/queue/queue.rs`:
- Around line 91-110: The delayed enqueue uses a fire-and-forget tokio::spawn
inside the conditional in queue.rs (the block that calls adapter.enqueue,
tokio::spawn, and crate::modules::telemetry::collector::track_queue_emit), which
means delayed tasks cannot be tracked, canceled, or guaranteed on shutdown; fix
by either persisting the delayed message to a durable store and scheduling it
via a background scheduler (so enqueue is replayed reliably on restart) or by
returning/managing a JoinHandle/JoinSet from the spawn and wiring it into the
process shutdown/cancellation logic so pending sleeps can be awaited or
canceled; update the code paths around the tokio::spawn, adapter.enqueue call,
and where tasks are created so delayed messages are either persisted or
registered with a shutdown-aware task manager.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 640305c9-b2a3-442b-b402-5e70466c1828
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (11)
cli/src/state.rscli/src/update.rsconsole/packages/console-rust/build.rsdocs/content/how-to/schedule-cron-task.mdxdocs/content/how-to/use-queues.mdxengine/function-macros/src/lib.rsengine/src/modules/cron/cron.rsengine/src/modules/cron/structs.rsengine/src/modules/queue/queue.rsframeworks/motia/motia-js/packages/motia/src/new/build/utils.tsframeworks/motia/motia-js/packages/motia/src/types.ts
| tokio::spawn(async move { | ||
| tokio::time::sleep(std::time::Duration::from_millis(delay)).await; | ||
| let _ = adapter | ||
| .enqueue(&topic, event_data, traceparent, baggage) | ||
| .await; | ||
| crate::modules::telemetry::collector::track_queue_emit(); | ||
| }); |
There was a problem hiding this comment.
Delayed enqueue errors are silently swallowed.
The spawned task ignores the result of adapter.enqueue() with let _ = .... If the delayed enqueue fails, there's no logging or error handling, making failures invisible. Consider logging errors:
🛡️ Proposed fix to log enqueue errors
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
- let _ = adapter
+ if let Err(e) = adapter
.enqueue(&topic, event_data, traceparent, baggage)
- .await;
- crate::modules::telemetry::collector::track_queue_emit();
+ .await
+ {
+ tracing::error!(
+ topic = %topic,
+ error = ?e,
+ "Delayed enqueue failed"
+ );
+ } else {
+ crate::modules::telemetry::collector::track_queue_emit();
+ }
});📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| tokio::spawn(async move { | |
| tokio::time::sleep(std::time::Duration::from_millis(delay)).await; | |
| let _ = adapter | |
| .enqueue(&topic, event_data, traceparent, baggage) | |
| .await; | |
| crate::modules::telemetry::collector::track_queue_emit(); | |
| }); | |
| tokio::spawn(async move { | |
| tokio::time::sleep(std::time::Duration::from_millis(delay)).await; | |
| if let Err(e) = adapter | |
| .enqueue(&topic, event_data, traceparent, baggage) | |
| .await | |
| { | |
| tracing::error!( | |
| topic = %topic, | |
| error = ?e, | |
| "Delayed enqueue failed" | |
| ); | |
| } else { | |
| crate::modules::telemetry::collector::track_queue_emit(); | |
| } | |
| }); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/src/modules/queue/queue.rs` around lines 101 - 107, The spawned task
currently swallows errors from adapter.enqueue() (let _ = ...), so change it to
await the Result and log failures instead of discarding them: capture the result
of adapter.enqueue(&topic, event_data, traceparent, baggage).await (e.g., if let
Err(e) = ... or match ...) and call a logger (tracing::error! or log::error!)
including the error and contextual info (topic, maybe delay) so enqueue failures
are visible; keep the call to
crate::modules::telemetry::collector::track_queue_emit() after handling the
result.
Summary
Fixes #1219
Adds the ability to dynamically control cron/schedule trigger steps at runtime — pause, resume, and list cron jobs — instead of requiring hard-coded cron expressions with no way to stop or control them after deployment.
Changes
Engine - Cron Module
pause_cronfunction: Pause a running cron job by trigger IDresume_cronfunction: Resume a paused cron job by trigger IDlist_cron_jobsfunction: List all cron jobs with their current status (active/paused)CronAdapter(engine/src/modules/cron/structs.rs)engine/src/modules/cron/cron.rs)Engine - Queue Module
delay_ms(engine/src/modules/queue/queue.rs)Framework (motia-js)
Documentation
Code Quality Fixes
skip_while_nextclippy lint infunction-macrosneedless_borrows_for_generic_argsin console build scriptcollapsible_ifwarnings in queue modulemanual_inspectandmanual_ok_errin CLI cratecargo fmtformatting fixesTesting
cargo fmt --all -- --check✅cargo clippy --all-targets --all-features -- -D warnings✅Summary by CodeRabbit
Release Notes
New Features
Documentation