Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions cli/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@ impl AppState {
std::fs::write(&temp_path, &content)?;

// Atomic rename
std::fs::rename(&temp_path, path).map_err(|e| {
std::fs::rename(&temp_path, path).inspect_err(|_e| {
// Clean up temp file on failure
let _ = std::fs::remove_file(&temp_path);
e
})?;

Ok(())
Expand Down
5 changes: 1 addition & 4 deletions cli/src/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,7 @@ pub async fn run_background_check(
(updates, true) // true = check completed, should update timestamp
};

match tokio::time::timeout(Duration::from_millis(timeout_ms), check).await {
Ok(result) => Some(result),
Err(_) => None, // Timed out, will retry next run
}
tokio::time::timeout(Duration::from_millis(timeout_ms), check).await.ok()
}

/// Check if a managed binary is installed on disk.
Expand Down
2 changes: 1 addition & 1 deletion console/packages/console-rust/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn main() {

// Run vite build via pnpm from workspace root
let build_result = Command::new("pnpm")
.current_dir(&workspace_root)
.current_dir(workspace_root)
.args(["run", "build:binary"])
.status();

Expand Down
63 changes: 63 additions & 0 deletions docs/content/how-to/schedule-cron-task.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,69 @@ This runs the Function every 6 hours. The `expression` field uses standard cron

The Function executes automatically on the defined schedule. The Engine handles scheduling, no external cron daemon needed.

## Controlling Cron Jobs at Runtime

You can dynamically **pause**, **resume**, and **list** cron jobs without redeploying or changing cron expressions.

### Using Motia Framework (`ctx.cron`)

Inside any step handler, use `ctx.cron` to control cron jobs:

```typescript title="control-cron.step.ts"
import { step, http } from 'motia'

export default step(
{
name: 'control-cron',
triggers: [http('POST', '/cron/control')],
enqueues: [],
},
async ({ request }, ctx) => {
const { action, triggerId } = request.body

switch (action) {
case 'pause':
// Pause a cron job — it stays registered but skips executions
await ctx.cron.pause(triggerId)
return { status: 200, body: { message: `Paused ${triggerId}` } }

case 'resume':
// Resume a paused cron job
await ctx.cron.resume(triggerId)
return { status: 200, body: { message: `Resumed ${triggerId}` } }

case 'list':
// List all cron jobs and their paused status
const jobs = await ctx.cron.list()
return { status: 200, body: { jobs } }
}
},
)
```

### Using the SDK directly

```typescript title="cron-control-sdk.ts"
import { init } from 'iii-sdk'

const iii = init('ws://localhost:49134')

// Pause a cron job by its trigger ID
await iii.call('pause_cron', { id: 'my-trigger-id' })

// Resume it later
await iii.call('resume_cron', { id: 'my-trigger-id' })

// List all cron jobs
const jobs = await iii.call('list_cron_jobs', {})
// => [{ id: '...', function_id: '...', paused: false }, ...]
```

<Callout title="Trigger IDs" type="info">
The trigger ID is assigned when a cron trigger is registered. In Motia, it follows the pattern
`steps::<step-name>::trigger::cron(<expression>)`. Use `ctx.cron.list()` to discover trigger IDs.
</Callout>

{/* TODO: Uncomment once the Cron module reference page exists */}
{/* <Callout title="See also" type="info">
For advanced scheduling options, see the [Cron module reference](/docs/modules/module-cron).
Expand Down
53 changes: 53 additions & 0 deletions docs/content/how-to/use-queues.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,59 @@ iii.register_trigger("queue", "orders::high-value-alert", json!({

The producer returns immediately after enqueueing. The queue adapter distributes the message to all subscribed functions, which process it independently on their own workers. If a consumer fails, it does not affect the producer or other consumers.

## Delayed Enqueue

You can delay message delivery by specifying a `delay_ms` (milliseconds) when enqueuing. The engine holds the message for the specified duration before publishing it to consumers. Maximum delay: **15 minutes** (900,000 ms).

### Using Motia Framework (`ctx.delay`)

```typescript title="delayed-step.step.ts"
import { step, http, queue } from 'motia'

export default step(
{
name: 'send-reminder',
triggers: [http('POST', '/reminders')],
enqueues: ['reminder.send'],
},
async ({ request }, ctx) => {
const { userId, message } = request.body

// Send a reminder after 5 minutes
await ctx.delay('reminder.send', { userId, message }, 5 * 60 * 1000)

return { status: 200, body: { scheduled: true } }
},
)
```

### Using `enqueue` with `delayMs`

You can also pass `delayMs` directly through `ctx.enqueue`:

```typescript title="delayed-enqueue.step.ts"
await ctx.enqueue({
topic: 'order.follow-up',
data: { orderId: '123' },
delayMs: 30_000, // 30 seconds
})
```

### Using the SDK directly

```typescript title="delayed-enqueue-sdk.ts"
iii.triggerVoid('enqueue', {
topic: 'order.follow-up',
data: { orderId: '123' },
delay_ms: 30000, // 30 seconds
})
```

<Callout title="Delay limits" type="warn">
The maximum `delay_ms` value is **900,000** (15 minutes). For longer delays, consider using a cron trigger
with a condition check, or enqueue a delayed message that re-enqueues itself with the remaining delay.
</Callout>

## Queue vs triggerVoid

| | `enqueue` | `triggerVoid()` |
Expand Down
3 changes: 1 addition & 2 deletions engine/function-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ pub fn service(attr: TokenStream, item: TokenStream) -> TokenStream {
.sig
.inputs
.iter()
.skip_while(|arg| matches!(arg, syn::FnArg::Receiver(_)))
.next()
.find(|arg| !matches!(arg, syn::FnArg::Receiver(_)))
{
Some(syn::FnArg::Typed(pat_type)) => {
let ty = &*pat_type.ty;
Expand Down
73 changes: 71 additions & 2 deletions engine/src/modules/cron/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@ use std::{

use async_trait::async_trait;
use colored::Colorize;
use function_macros::{function, service};
use futures::Future;
use once_cell::sync::Lazy;
use serde::Deserialize;
use serde_json::Value;

use super::{
config::CronModuleConfig,
structs::{CronAdapter, CronSchedulerAdapter},
};
use crate::{
engine::{Engine, EngineTrait},
engine::{Engine, EngineTrait, Handler, RegisterFunctionRequest},
function::FunctionResult,
modules::module::{AdapterFactory, ConfigurableModule, Module},
protocol::ErrorBody,
trigger::{Trigger, TriggerRegistrator},
};

Expand All @@ -33,6 +37,69 @@ pub struct CronCoreModule {
_config: CronModuleConfig,
}

#[derive(Deserialize)]
pub struct CronJobIdInput {
/// The trigger ID of the cron job to target.
id: String,
}

#[service(name = "cron")]
impl CronCoreModule {
#[function(id = "pause_cron", description = "Pause a cron job by trigger ID")]
pub async fn pause_cron(
&self,
input: CronJobIdInput,
) -> FunctionResult<Option<Value>, ErrorBody> {
match self.adapter.pause(&input.id).await {
Ok(_) => FunctionResult::Success(Some(
serde_json::json!({ "status": "paused", "id": input.id }),
)),
Err(e) => FunctionResult::Failure(ErrorBody {
code: "cron_pause_failed".into(),
message: e.to_string(),
}),
}
}

#[function(
id = "resume_cron",
description = "Resume a paused cron job by trigger ID"
)]
pub async fn resume_cron(
&self,
input: CronJobIdInput,
) -> FunctionResult<Option<Value>, ErrorBody> {
match self.adapter.resume(&input.id).await {
Ok(_) => FunctionResult::Success(Some(
serde_json::json!({ "status": "resumed", "id": input.id }),
)),
Err(e) => FunctionResult::Failure(ErrorBody {
code: "cron_resume_failed".into(),
message: e.to_string(),
}),
}
}

#[function(
id = "list_cron_jobs",
description = "List all cron jobs with their status"
)]
pub async fn list_cron_jobs(&self, _input: Value) -> FunctionResult<Option<Value>, ErrorBody> {
let jobs = self.adapter.list_jobs().await;
let result: Vec<Value> = jobs
.into_iter()
.map(|(id, function_id, paused)| {
serde_json::json!({
"id": id,
"function_id": function_id,
"paused": paused,
})
})
.collect();
FunctionResult::Success(Some(serde_json::json!(result)))
}
}

#[async_trait]
impl Module for CronCoreModule {
fn name(&self) -> &'static str {
Expand All @@ -43,7 +110,9 @@ impl Module for CronCoreModule {
Self::create_with_adapters(engine, config).await
}

fn register_functions(&self, _engine: Arc<Engine>) {}
fn register_functions(&self, engine: Arc<Engine>) {
self.register_functions(engine);
}

async fn initialize(&self) -> anyhow::Result<()> {
tracing::info!("Initializing CronModule");
Expand Down
Loading