diff --git a/docs/how-to/use-queues.mdx b/docs/how-to/use-queues.mdx index a8e725652..0a4695882 100644 --- a/docs/how-to/use-queues.mdx +++ b/docs/how-to/use-queues.mdx @@ -1,21 +1,358 @@ --- title: 'Use Queues' -description: 'How to enqueue work and process it asynchronously with retries, concurrency control, and ordering guarantees using Named Queues and Trigger Actions.' +description: 'Decouple producers from consumers, process work asynchronously with retries, fan-out delivery, concurrency control, and ordering guarantees.' --- ## Goal -Offload work to a named queue so it runs asynchronously with built-in retries, concurrency control, and optional FIFO ordering. Target functions receive data normally — no handler changes required. +Use queues to run work asynchronously with built-in retries, dead-letter support, and concurrency control. iii offers two queue modes — **topic-based** (pub/sub fan-out) and **named queues** (direct function targeting) — so you can pick the delivery model that fits your use case. + +## What Are Queues + +A queue sits between the code that **produces** work and the code that **processes** it. Instead of calling a function and waiting for it to finish, you hand the work to a queue. The queue stores the message, delivers it to a consumer, retries on failure, and routes permanently failed messages to a dead-letter queue (DLQ) for later inspection. + +This separation solves three problems: + +1. **Speed** — the producer responds immediately instead of blocking on slow downstream work. +2. **Reliability** — transient failures (network blips, service restarts) are retried automatically instead of being lost. +3. **Load control** — concurrency limits prevent consumers from overwhelming downstream systems. + +```mermaid +flowchart LR + P[Producer] -->|enqueue| Q[(Queue)] + Q -->|deliver with retries| C[Consumer] + Q -->|exhausted retries| DLQ[(Dead Letter Queue)] +``` + +## When to Use Queues + +| Scenario | Use a queue? | Why | +|----------|-------------|-----| +| HTTP handler must respond fast, but downstream work is slow | **Yes** | Enqueue the work and return `202 Accepted` immediately | +| Multiple functions must react to the same event | **Yes** | Topic-based queues fan out to every subscriber | +| Work must survive process restarts | **Yes** | Queues persist messages and retry on failure | +| External API has rate limits | **Yes** | Concurrency control throttles parallel requests | +| Transactions for the same entity must be ordered | **Yes** | FIFO queues guarantee per-group ordering | +| You need the function's return value right now | **No** | Use a [synchronous trigger](./trigger-actions#1-synchronous-no-action) instead | +| The work is non-critical and losing it is acceptable | **Maybe** | [TriggerAction.Void()](./trigger-actions#2-void-fire-and-forget) is simpler if you don't need retries | + +## Two Queue Modes + +iii supports two ways to use queues. Both share the same adapter, retry engine, and DLQ infrastructure — they differ in how producers address consumers. + +| | Topic-based | Named queues | +|---|---|---| +| **Producer** | `trigger({ function_id: 'enqueue', payload: { topic, data } })` | `trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) })` | +| **Consumer** | Registers `registerTrigger({ type: 'queue', config: { topic } })` | No registration — function is the target | +| **Delivery** | Fan-out: each subscribed function gets every message; replicas compete | Single target function per enqueue call | +| **Config** | Optional `queue_config` on trigger | `queue_configs` in `iii-config.yaml` | +| **Best for** | Durable pub/sub with retries and fan-out | Direct function invocation with retries, FIFO, DLQ | - Queues use the `Enqueue` trigger action. If you are new to trigger actions, read [Trigger Actions](./trigger-actions) first to understand the difference between synchronous, Void, and Enqueue invocations. + Named queues use the `Enqueue` trigger action. If you are new to trigger actions, read [Trigger Actions](./trigger-actions) first. -## Steps +--- + +## Topic-Based Queues + +Topic-based queues work like durable pub/sub: you publish a message to a **topic**, and every function subscribed to that topic receives a copy. If a function has multiple replicas, they compete on a shared per-function queue — only one replica processes each message. + + + + Subscribe one or more functions to the same topic. Each function gets its own internal queue. + + + + ```typescript + import { registerWorker } from 'iii-sdk' + + const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134') + + iii.registerFunction( + { id: 'notify::email' }, + async (data) => { + await sendEmail(data.userId, `Order ${data.orderId} created`) + return {} + }, + ) + + iii.registerFunction( + { id: 'audit::log' }, + async (data) => { + await writeAuditLog('order.created', data) + return {} + }, + ) + + iii.registerTrigger({ + type: 'queue', + function_id: 'notify::email', + config: { topic: 'order.created' }, + }) + + iii.registerTrigger({ + type: 'queue', + function_id: 'audit::log', + config: { topic: 'order.created' }, + }) + ``` + + + ```python + from iii import register_worker + + iii = register_worker("ws://localhost:49134") + + + def send_email_notification(data): + send_email(data["userId"], f"Order {data['orderId']} created") + return {} + + + def write_audit(data): + write_audit_log("order.created", data) + return {} + + + iii.register_function({"id": "notify::email"}, send_email_notification) + iii.register_function({"id": "audit::log"}, write_audit) + + iii.register_trigger({ + "type": "queue", + "function_id": "notify::email", + "config": {"topic": "order.created"}, + }) + + iii.register_trigger({ + "type": "queue", + "function_id": "audit::log", + "config": {"topic": "order.created"}, + }) + ``` + + + ```rust + use iii_sdk::{register_worker, InitOptions, RegisterFunction, RegisterTriggerInput}; + use serde_json::{json, Value}; + + let iii = register_worker("ws://localhost:49134", InitOptions::default()); + + iii.register_function(RegisterFunction::new_async( + "notify::email", + |data: Value| async move { + send_email(data["userId"].as_str().unwrap_or(""), &format!("Order {} created", data["orderId"])).await?; + Ok(json!({})) + }, + )); + + iii.register_function(RegisterFunction::new_async( + "audit::log", + |data: Value| async move { + write_audit_log("order.created", &data).await?; + Ok(json!({})) + }, + )); + + iii.register_trigger(RegisterTriggerInput { + trigger_type: "queue".into(), + function_id: "notify::email".into(), + config: json!({ "topic": "order.created" }), + metadata: None, + })?; + + iii.register_trigger(RegisterTriggerInput { + trigger_type: "queue".into(), + function_id: "audit::log".into(), + config: json!({ "topic": "order.created" }), + metadata: None, + })?; + ``` + + + + Both `notify::email` and `audit::log` are now subscribed to `order.created`. Every message published to that topic reaches both functions. + + + + From any function, publish a message using the builtin `enqueue` function. The engine fans it out to every subscribed function. + + + + ```typescript + import { TriggerAction } from 'iii-sdk' + + await iii.trigger({ + function_id: 'enqueue', + payload: { + topic: 'order.created', + data: { orderId: 'ord_789', userId: 'usr_42', total: 149.99 }, + }, + action: TriggerAction.Void(), + }) + ``` + + + ```python + from iii import TriggerAction + + iii.trigger({ + "function_id": "enqueue", + "payload": { + "topic": "order.created", + "data": {"orderId": "ord_789", "userId": "usr_42", "total": 149.99}, + }, + "action": TriggerAction.Void(), + }) + ``` + + + ```rust + use iii_sdk::{TriggerAction, TriggerRequest}; + use serde_json::json; + + iii.trigger(TriggerRequest { + function_id: "enqueue".into(), + payload: json!({ + "topic": "order.created", + "data": { "orderId": "ord_789", "userId": "usr_42", "total": 149.99 }, + }), + action: Some(TriggerAction::Void), + timeout_ms: None, + }).await?; + ``` + + + + The producer does not need to know which functions are subscribed — it only knows the topic name. + + + + Topic-based queues use **fan-out per function**: + + - Each distinct function subscribed to a topic receives a **copy** of every message. + - If a function has multiple replicas running, they **compete** on a shared per-function queue — only one replica processes each message. + + ```mermaid + sequenceDiagram + participant P as Producer + participant E as Engine + participant Q1 as order.created::notify::email + participant Q2 as order.created::audit::log + participant E1 as notify::email (replica 1) + participant E2 as notify::email (replica 2) + participant A1 as audit::log + + Note over Q1,Q2: Both functions subscribed to topic "order.created" + + P->>E: enqueue({ topic: "order.created", data }) + + par Engine copies message to each function's queue + E->>Q1: push(data) + E->>Q2: push(data) + end + + Note over Q1,E2: Replicas compete — only one processes the message + Q1->>E1: deliver + E1-->>Q1: Ack + + Q2->>A1: deliver + A1-->>Q2: Ack + + P->>E: enqueue({ topic: "order.created", data }) + + par + E->>Q1: push(data) + E->>Q2: push(data) + end + + Note over Q1,E2: This time replica 2 wins + Q1->>E2: deliver + E2-->>Q1: Ack + + Q2->>A1: deliver + A1-->>Q2: Ack + ``` + + This gives you pub/sub-style event distribution with the durability and retry guarantees of a queue. + + + + Attach a condition function to a queue trigger to filter which messages reach the handler. The condition receives the message data and returns `true` or `false`. If `false`, the handler is not called — no error is surfaced. + + + + ```typescript + iii.registerFunction( + { id: 'conditions::is_high_value' }, + async (data) => data.total > 1000, + ) + + iii.registerTrigger({ + type: 'queue', + function_id: 'notify::vip-team', + config: { + topic: 'order.created', + condition_function_id: 'conditions::is_high_value', + }, + }) + ``` + + + ```python + def is_high_value(data): + return data.get("total", 0) > 1000 + + + iii.register_function({"id": "conditions::is_high_value"}, is_high_value) + + iii.register_trigger({ + "type": "queue", + "function_id": "notify::vip-team", + "config": { + "topic": "order.created", + "condition_function_id": "conditions::is_high_value", + }, + }) + ``` + + + ```rust + iii.register_function(RegisterFunction::new_async( + "conditions::is_high_value", + |data: Value| async move { + Ok(json!(data["total"].as_f64().unwrap_or(0.0) > 1000.0)) + }, + )); + + iii.register_trigger(RegisterTriggerInput { + trigger_type: "queue".into(), + function_id: "notify::vip-team".into(), + config: json!({ + "topic": "order.created", + "condition_function_id": "conditions::is_high_value", + }), + metadata: None, + })?; + ``` + + + + + See [Conditions](/examples/conditions) for the full pattern including HTTP and state trigger conditions. + + + + +--- + +## Named Queues + +Named queues target a specific function directly. You define queue settings in `iii-config.yaml` and reference the queue name when enqueuing work. - Declare one or more named queues under `queue_configs` in your `iii-config.yaml`. Each queue has independent retry, concurrency, and ordering settings. + Declare one or more named queues under `queue_configs`. Each queue has independent retry, concurrency, and ordering settings. ```yaml title="iii-config.yaml" modules: @@ -43,15 +380,13 @@ Offload work to a named queue so it runs asynchronously with built-in retries, c file_path: ./data/queue_store ``` - You can define as many named queues as your system requires. Each queue name is referenced when enqueuing work. - See the [Queue module reference](/modules/module-queue#queue-configuration) for every field, type, and default value. - From any function, enqueue a job by calling `trigger()` with `TriggerAction.Enqueue` and the target queue name. The caller does not wait for the job to be processed — it receives an acknowledgement (`messageReceiptId`) once the engine accepts the job. + From any function, enqueue a job by calling `trigger()` with `TriggerAction.Enqueue` and the target queue name. The caller receives an acknowledgement (`messageReceiptId`) once the engine accepts the job — it does not wait for processing. @@ -66,7 +401,7 @@ Offload work to a named queue so it runs asynchronously with built-in retries, c action: TriggerAction.Enqueue({ queue: 'payment' }), }) - console.log(receipt.messageReceiptId) // "msg_abc123" + console.log(receipt.messageReceiptId) ``` @@ -81,7 +416,7 @@ Offload work to a named queue so it runs asynchronously with built-in retries, c "action": TriggerAction.Enqueue(queue="payment"), }) - print(receipt["messageReceiptId"]) # "msg_abc123" + print(receipt["messageReceiptId"]) ``` @@ -102,16 +437,12 @@ Offload work to a named queue so it runs asynchronously with built-in retries, c timeout_ms: None, }).await?; - println!("{}", receipt["messageReceiptId"]); // "msg_abc123" + println!("{}", receipt["messageReceiptId"]); ``` - The target function (`orders::process-payment` in this example) receives the `payload` as its input — it does not need to know it was invoked via a queue. - - - Unlike `TriggerAction.Void()` which is fire-and-forget, `Enqueue` validates the queue exists and (for FIFO) checks the `message_group_field`. The `messageReceiptId` lets you correlate enqueue operations with DLQ entries or retry events. See [Trigger Actions](./trigger-actions#3-enqueue-named-queue) for a detailed comparison. - + The target function receives the `payload` as its input — it does not need to know it was invoked via a queue. @@ -168,7 +499,7 @@ Offload work to a named queue so it runs asynchronously with built-in retries, c - When processing order matters — for example, financial transactions for the same account — use a FIFO queue. Set `type: fifo` and specify `message_group_field`, the field in your payload whose value determines the ordering group. Jobs sharing the same group value are processed strictly in order. + When processing order matters — for example, financial transactions for the same account — set `type: fifo` and specify `message_group_field`. Jobs sharing the same group value are processed strictly in order. ```yaml title="iii-config.yaml (excerpt)" queue_configs: @@ -179,7 +510,7 @@ Offload work to a named queue so it runs asynchronously with built-in retries, c message_group_field: transaction_id ``` - The payload **must** contain the field named by `message_group_field`, and its value must be non-null. The engine rejects enqueue requests that violate this. + The payload **must** contain the field named by `message_group_field`, and its value must be non-null. @@ -222,7 +553,7 @@ Offload work to a named queue so it runs asynchronously with built-in retries, c - Every named queue retries failed jobs automatically. Configure `max_retries` (total delivery attempts before the job moves to the dead-letter queue) and `backoff_ms` (base delay between retries). Backoff is exponential: + Every named queue retries failed jobs automatically. Backoff is exponential: ``` delay = backoff_ms × 2^(attempt - 1) @@ -245,23 +576,23 @@ Offload work to a named queue so it runs asynchronously with built-in retries, c type: standard ``` - After all retries are exhausted, the job moves to a dead-letter queue (DLQ) where it is preserved for inspection or manual reprocessing. + After all retries are exhausted, the job moves to a dead-letter queue (DLQ). - See [Manage Failed Triggers](./dead-letter-queues) for DLQ configuration, inspection, and redrive. + See [Manage Failed Triggers](./dead-letter-queues) for DLQ inspection and redrive. - The `concurrency` field sets the maximum number of jobs the engine processes simultaneously from a single queue. This applies per-engine-instance. + The `concurrency` field sets the maximum number of jobs the engine processes simultaneously from a single queue (per engine instance). ```yaml title="iii-config.yaml (excerpt)" queue_configs: default: - concurrency: 10 # up to 10 jobs in parallel + concurrency: 10 type: standard payment: - concurrency: 2 # ignored for ordering — FIFO uses prefetch=1 + concurrency: 2 type: fifo message_group_field: transaction_id ``` @@ -269,13 +600,13 @@ Offload work to a named queue so it runs asynchronously with built-in retries, c - **Standard queues**: the engine pulls up to `concurrency` jobs simultaneously. - **FIFO queues**: the engine processes one job at a time (prefetch=1) to preserve ordering, regardless of the `concurrency` value. - Use low concurrency to protect downstream systems from overload (e.g. rate-limited APIs). Use high concurrency for embarrassingly parallel work (e.g. image resizing). + Use low concurrency to protect rate-limited APIs. Use high concurrency for embarrassingly parallel work like image resizing. -## Standard vs FIFO Queues +--- -The two queue types solve fundamentally different problems. Standard queues maximize throughput. FIFO queues guarantee ordering. +## Standard vs FIFO Queues | Dimension | Standard | FIFO | |-----------|----------|------| @@ -373,13 +704,13 @@ sequenceDiagram Note over DLQ: Job preserved for inspection or redrive ``` -## Real-World Scenarios +--- -### Scenario 1: E-Commerce Order Pipeline +## Real-World Scenarios -An order API must respond fast. Payment processing is critical and must happen in order per transaction. Email confirmation should be reliable. Analytics is best-effort. +### Scenario 1: HTTP API to Queue Pipeline -**Queue configuration:** +The most common pattern — an HTTP endpoint accepts a request, responds immediately, and offloads the actual work to a queue. This keeps API response times fast regardless of how long downstream processing takes. ```yaml title="iii-config.yaml" modules: @@ -570,301 +901,206 @@ iii.register_trigger(RegisterTriggerInput { This example uses all three [trigger actions](./trigger-actions): **Enqueue** for payment (reliable, ordered) and email (reliable, parallel), and **Void** for analytics (best-effort). -### Scenario 2: Bulk Email Delivery with Rate Limiting +### Scenario 2: Event Fan-Out with Topic Queues -A marketing system sends thousands of emails. The SMTP provider has a rate limit. A standard queue with low concurrency prevents overloading the provider while retrying transient SMTP failures. +An order system publishes `order.created` events. Multiple independent services — email notifications, inventory updates, and analytics — each need to process every order. Topic-based queues fan out each message to all subscribers with independent retries per function. + +```mermaid +flowchart LR + P[orders::create] -->|publish| T((order.created)) + T --> Q1[notify::email queue] + T --> Q2[inventory::reserve queue] + T --> Q3[analytics::track queue] + Q1 --> F1[notify::email] + Q2 --> F2[inventory::reserve] + Q3 --> F3[analytics::track] +``` -**Queue configuration:** -```typescript title="process-order.ts" -import { registerWorker, Logger } from 'iii-sdk' +```typescript +import { registerWorker, TriggerAction } from 'iii-sdk' const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134') -iii.registerFunction({ id: 'orders::process-order' }, async (order) => { - const logger = new Logger() - logger.info('Processing payment', { orderId: order.id }) - // ...payment logic... - return { processed: true } +iii.registerFunction({ id: 'notify::email' }, async (data) => { + await sendEmail(data.email, `Your order ${data.orderId} is confirmed!`) + return {} }) -``` - - -```python title="process_order.py" -import os - -from iii import Logger, register_worker - -iii = register_worker(os.environ.get("III_URL", "ws://localhost:49134")) - -def process_order(order): - logger = Logger() - logger.info("Processing payment", {"orderId": order["id"]}) - # ...payment logic... - return {"processed": True} +iii.registerFunction({ id: 'inventory::reserve' }, async (data) => { + for (const item of data.items) { + await reserveStock(item.sku, item.quantity) + } + return {} +}) +iii.registerFunction({ id: 'analytics::track' }, async (data) => { + await trackEvent('order_created', { orderId: data.orderId, total: data.total }) + return {} +}) -iii.register_function("orders::process-order", process_order) -``` - - -```rust title="process_order.rs" -use iii_sdk::{register_worker, InitOptions, Logger, RegisterFunction}; -use serde_json::{json, Value}; +iii.registerTrigger({ + type: 'queue', + function_id: 'notify::email', + config: { topic: 'order.created' }, +}) -let iii = register_worker( - &std::env::var("III_URL").unwrap_or_else(|_| "ws://127.0.0.1:49134".to_string()), - InitOptions::default(), -); - -let reg = RegisterFunction::new_async("orders::process-order", |order: Value| async move { - let logger = Logger::new(); - let order_id = order["id"].as_str().unwrap_or(""); - logger.info("Processing payment", Some(json!({ "orderId": order_id }))); - // ...payment logic... - Ok(json!({ "processed": true })) -}); -iii.register_function(reg); -``` - - +iii.registerTrigger({ + type: 'queue', + function_id: 'inventory::reserve', + config: { topic: 'order.created' }, +}) -A worker can also enqueue further work, creating processing pipelines: +iii.registerTrigger({ + type: 'queue', + function_id: 'analytics::track', + config: { topic: 'order.created' }, +}) - - -```typescript -iii.registerFunction({ id: 'orders::process-order' }, async (order) => { - // ...charge the customer... +iii.registerFunction({ id: 'orders::create' }, async (req) => { + const order = { id: crypto.randomUUID(), ...req.body } await iii.trigger({ - function_id: 'notifications::send', - payload: { orderId: order.id, type: 'payment-confirmed' }, - action: TriggerAction.Enqueue({ queue: 'default' }), + function_id: 'enqueue', + payload: { topic: 'order.created', data: order }, + action: TriggerAction.Void(), }) - return { processed: true } + return { status_code: 201, body: { orderId: order.id } } }) ``` ```python -def process_order(order): - # ...charge the customer... +from iii import TriggerAction, register_worker + +iii = register_worker("ws://localhost:49134") + + +def send_email_notification(data): + send_email(data["email"], f"Your order {data['orderId']} is confirmed!") + return {} + + +def reserve_inventory(data): + for item in data["items"]: + reserve_stock(item["sku"], item["quantity"]) + return {} + + +def track_analytics(data): + track_event("order_created", {"orderId": data["orderId"], "total": data["total"]}) + return {} + + +iii.register_function({"id": "notify::email"}, send_email_notification) +iii.register_function({"id": "inventory::reserve"}, reserve_inventory) +iii.register_function({"id": "analytics::track"}, track_analytics) + +for fid in ["notify::email", "inventory::reserve", "analytics::track"]: + iii.register_trigger({ + "type": "queue", + "function_id": fid, + "config": {"topic": "order.created"}, + }) + + +def create_order(req): + import uuid + order = {"id": str(uuid.uuid4()), **req.get("body", {})} iii.trigger({ - "function_id": "notifications::send", - "payload": {"orderId": order["id"], "type": "payment-confirmed"}, - "action": TriggerAction.Enqueue(queue="default"), + "function_id": "enqueue", + "payload": {"topic": "order.created", "data": order}, + "action": TriggerAction.Void(), }) - return {"processed": True} + return {"status_code": 201, "body": {"orderId": order["id"]}} + + +iii.register_function({"id": "orders::create"}, create_order) ``` ```rust -use iii_sdk::{RegisterFunction, TriggerAction, TriggerRequest}; +use iii_sdk::{ + register_worker, InitOptions, RegisterFunction, + RegisterTriggerInput, TriggerAction, TriggerRequest, +}; use serde_json::{json, Value}; +let iii = register_worker("ws://localhost:49134", InitOptions::default()); + +iii.register_function(RegisterFunction::new_async( + "notify::email", + |data: Value| async move { + send_email(data["email"].as_str().unwrap_or(""), &format!("Your order {} is confirmed!", data["orderId"])).await?; + Ok(json!({})) + }, +)); + +iii.register_function(RegisterFunction::new_async( + "inventory::reserve", + |data: Value| async move { + for item in data["items"].as_array().unwrap_or(&vec![]) { + reserve_stock(item["sku"].as_str().unwrap_or(""), item["quantity"].as_u64().unwrap_or(0)).await?; + } + Ok(json!({})) + }, +)); + +iii.register_function(RegisterFunction::new_async( + "analytics::track", + |data: Value| async move { + track_event("order_created", &json!({ "orderId": data["orderId"], "total": data["total"] })).await?; + Ok(json!({})) + }, +)); + +for fid in &["notify::email", "inventory::reserve", "analytics::track"] { + iii.register_trigger(RegisterTriggerInput { + trigger_type: "queue".into(), + function_id: fid.to_string(), + config: json!({ "topic": "order.created" }), + metadata: None, + })?; +} + let iii_clone = iii.clone(); -let reg = RegisterFunction::new_async("orders::process-order", move |order: Value| { +iii.register_function(RegisterFunction::new_async("orders::create", move |req: Value| { let iii = iii_clone.clone(); async move { + let order_id = uuid::Uuid::new_v4().to_string(); + iii.trigger(TriggerRequest { - function_id: "notifications::send".to_string(), - payload: json!({ - "orderId": order["id"], - "type": "payment-confirmed", - }), - action: Some(TriggerAction::Enqueue { queue: "default".to_string() }), + function_id: "enqueue".into(), + payload: json!({ "topic": "order.created", "data": { "id": order_id, "items": req["body"]["items"] } }), + action: Some(TriggerAction::Void), timeout_ms: None, - }) - .await?; + }).await?; - Ok(json!({ "processed": true })) + Ok(json!({ "status_code": 201, "body": { "orderId": order_id } })) } -}); -iii.register_function(reg); +})); ``` -### 4. Use FIFO queues for ordered processing +All three functions receive every `order.created` event independently. If `inventory::reserve` fails and retries, it does not affect `notify::email` or `analytics::track`. + +### Scenario 3: Financial Transaction Ledger (FIFO) -When order matters (e.g. payment transactions for the same account), use a FIFO queue. Set `type: fifo` and specify `message_group_field` — the field in your job data whose value determines the ordering group. Jobs with the same group value are processed strictly in order. The field named by `message_group_field` must be present **and non-null** in every job payload — the engine rejects enqueue requests where the field is missing or null. +Transactions for the same account must be applied in order to prevent balance inconsistencies. Different accounts can process in parallel. ```yaml title="iii-config.yaml (excerpt)" queue_configs: - bulk-email: - max_retries: 5 - concurrency: 3 - type: standard - backoff_ms: 5000 -``` - -```mermaid -sequenceDiagram - participant Campaign as campaigns::launch - participant Q as bulk-email queue - participant W1 as Worker 1 - participant W2 as Worker 2 - participant W3 as Worker 3 - participant SMTP as SMTP Provider - - Campaign->>Q: Enqueue 1000 emails - - par concurrency: 3 - Q->>W1: Email #1 - Q->>W2: Email #2 - Q->>W3: Email #3 - end - - W1->>SMTP: Send - SMTP-->>W1: 200 OK - W1-->>Q: ACK - - W2->>SMTP: Send - SMTP-->>W2: 429 Rate Limited - W2-->>Q: NACK (retry with backoff) - - Note over Q,W2: Retry after 5s, then 10s, then 20s... - - W3->>SMTP: Send - SMTP-->>W3: 200 OK - W3-->>Q: ACK - - Q->>W2: Retry Email #2 - W2->>SMTP: Send - SMTP-->>W2: 200 OK - W2-->>Q: ACK -``` - - - -```typescript -import { registerWorker, TriggerAction } from 'iii-sdk' - -const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134') - -iii.registerFunction({ id: 'campaigns::launch' }, async (campaign) => { - for (const recipient of campaign.recipients) { - await iii.trigger({ - function_id: 'emails::send', - payload: { - to: recipient.email, - subject: campaign.subject, - body: campaign.body, - }, - action: TriggerAction.Enqueue({ queue: 'bulk-email' }), - }) - } - - return { enqueued: campaign.recipients.length } -}) - -iii.registerFunction({ id: 'emails::send' }, async (email) => { - const response = await fetch('https://smtp-provider.example/send', { - method: 'POST', - body: JSON.stringify(email), - headers: { 'Content-Type': 'application/json' }, - }) - - if (!response.ok) { - throw new Error(`SMTP error: ${response.status}`) - } - - return { sent: true } -}) -``` - - -```python -import requests -from iii import TriggerAction, register_worker - -iii = register_worker("ws://localhost:49134") - - -def launch_campaign(campaign): - for recipient in campaign["recipients"]: - iii.trigger({ - "function_id": "emails::send", - "payload": { - "to": recipient["email"], - "subject": campaign["subject"], - "body": campaign["body"], - }, - "action": TriggerAction.Enqueue(queue="bulk-email"), - }) - - return {"enqueued": len(campaign["recipients"])} - - -def send_email(email): - response = requests.post( - "https://smtp-provider.example/send", json=email - ) - response.raise_for_status() - return {"sent": True} - - -iii.register_function("campaigns::launch", launch_campaign) -iii.register_function("emails::send", send_email) -``` - - -```rust -use iii_sdk::{ - register_worker, InitOptions, RegisterFunction, - TriggerAction, TriggerRequest, -}; -use serde_json::{json, Value}; - -let iii = register_worker("ws://localhost:49134", InitOptions::default()); - -let iii_clone = iii.clone(); -let reg = RegisterFunction::new_async("campaigns::launch", move |campaign: Value| { - let iii = iii_clone.clone(); - async move { - let recipients = campaign["recipients"].as_array().unwrap(); - for recipient in recipients { - iii.trigger(TriggerRequest { - function_id: "emails::send".into(), - payload: json!({ - "to": recipient["email"], - "subject": campaign["subject"], - "body": campaign["body"], - }), - action: Some(TriggerAction::Enqueue { queue: "bulk-email".into() }), - timeout_ms: None, - }).await?; - } - Ok(json!({ "enqueued": recipients.len() })) - } -}); -iii.register_function(reg); -``` - - - -With `concurrency: 3`, at most three emails are in-flight at any time. Failed sends retry with exponential backoff (5s, 10s, 20s, 40s, 80s), protecting the SMTP provider from overload. - -### Scenario 3: Financial Transaction Ledger - -A banking system processes account transactions. Transactions for the same account must be applied in order to prevent balance inconsistencies. Different accounts can process in parallel. - -**Queue configuration:** - -```yaml title="iii-config.yaml (excerpt)" -queue_configs: - ledger: - max_retries: 15 - concurrency: 1 - type: fifo - message_group_field: account_id - backoff_ms: 500 + ledger: + max_retries: 15 + concurrency: 1 + type: fifo + message_group_field: account_id + backoff_ms: 500 ``` ```mermaid @@ -1011,9 +1247,171 @@ iii.register_function(reg); Because the `ledger` queue is FIFO with `message_group_field: account_id`, the deposit for `acct_A` always completes before the withdrawal. Without FIFO ordering, the withdrawal could execute first and fail with "Insufficient funds" even though the deposit was submitted first. +### Scenario 4: Bulk Email with Rate Limiting + +A marketing system sends thousands of emails. The SMTP provider has a rate limit. A standard queue with low concurrency prevents overloading the provider while retrying transient failures. + +```yaml title="iii-config.yaml (excerpt)" +queue_configs: + bulk-email: + max_retries: 5 + concurrency: 3 + type: standard + backoff_ms: 5000 +``` + +```mermaid +sequenceDiagram + participant Campaign as campaigns::launch + participant Q as bulk-email queue + participant W1 as Worker 1 + participant W2 as Worker 2 + participant W3 as Worker 3 + participant SMTP as SMTP Provider + + Campaign->>Q: Enqueue 1000 emails + + par concurrency: 3 + Q->>W1: Email #1 + Q->>W2: Email #2 + Q->>W3: Email #3 + end + + W1->>SMTP: Send + SMTP-->>W1: 200 OK + W1-->>Q: ACK + + W2->>SMTP: Send + SMTP-->>W2: 429 Rate Limited + W2-->>Q: NACK (retry with backoff) + + Note over Q,W2: Retry after 5s, then 10s, then 20s... + + W3->>SMTP: Send + SMTP-->>W3: 200 OK + W3-->>Q: ACK + + Q->>W2: Retry Email #2 + W2->>SMTP: Send + SMTP-->>W2: 200 OK + W2-->>Q: ACK +``` + + + +```typescript +import { registerWorker, TriggerAction } from 'iii-sdk' + +const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134') + +iii.registerFunction({ id: 'campaigns::launch' }, async (campaign) => { + for (const recipient of campaign.recipients) { + await iii.trigger({ + function_id: 'emails::send', + payload: { + to: recipient.email, + subject: campaign.subject, + body: campaign.body, + }, + action: TriggerAction.Enqueue({ queue: 'bulk-email' }), + }) + } + + return { enqueued: campaign.recipients.length } +}) + +iii.registerFunction({ id: 'emails::send' }, async (email) => { + const response = await fetch('https://smtp-provider.example/send', { + method: 'POST', + body: JSON.stringify(email), + headers: { 'Content-Type': 'application/json' }, + }) + + if (!response.ok) { + throw new Error(`SMTP error: ${response.status}`) + } + + return { sent: true } +}) +``` + + +```python +import requests +from iii import TriggerAction, register_worker + +iii = register_worker("ws://localhost:49134") + + +def launch_campaign(campaign): + for recipient in campaign["recipients"]: + iii.trigger({ + "function_id": "emails::send", + "payload": { + "to": recipient["email"], + "subject": campaign["subject"], + "body": campaign["body"], + }, + "action": TriggerAction.Enqueue(queue="bulk-email"), + }) + + return {"enqueued": len(campaign["recipients"])} + + +def send_email(email): + response = requests.post( + "https://smtp-provider.example/send", json=email + ) + response.raise_for_status() + return {"sent": True} + + +iii.register_function("campaigns::launch", launch_campaign) +iii.register_function("emails::send", send_email) +``` + + +```rust +use iii_sdk::{ + register_worker, InitOptions, RegisterFunction, + TriggerAction, TriggerRequest, +}; +use serde_json::{json, Value}; + +let iii = register_worker("ws://localhost:49134", InitOptions::default()); + +let iii_clone = iii.clone(); +let reg = RegisterFunction::new_async("campaigns::launch", move |campaign: Value| { + let iii = iii_clone.clone(); + async move { + let recipients = campaign["recipients"].as_array().unwrap(); + for recipient in recipients { + iii.trigger(TriggerRequest { + function_id: "emails::send".into(), + payload: json!({ + "to": recipient["email"], + "subject": campaign["subject"], + "body": campaign["body"], + }), + action: Some(TriggerAction::Enqueue { queue: "bulk-email".into() }), + timeout_ms: None, + }).await?; + } + Ok(json!({ "enqueued": recipients.len() })) + } +}); +iii.register_function(reg); +``` + + + +With `concurrency: 3`, at most three emails are in-flight at any time. Failed sends retry with exponential backoff (5s, 10s, 20s, 40s, 80s), protecting the SMTP provider from overload. + +--- + ## Choosing an Adapter -The queue adapter determines where messages are stored and how they are distributed. Your choice depends on your deployment topology. +The queue adapter determines where messages are stored and how they are distributed. | Scenario | Recommended Adapter | Why | |----------|-------------------|-----| @@ -1028,7 +1426,7 @@ Regardless of which adapter you choose, retry semantics, concurrency enforcement - When using the RabbitMQ adapter, iii creates exchanges and queues using a predictable naming convention. For a queue named `payment`, the main queue is `iii.__fn_queue::payment`, the retry queue is `iii.__fn_queue::payment::retry.queue`, and the DLQ is `iii.__fn_queue::payment::dlq.queue`. See [Dead Letter Queues](./dead-letter-queues#dlq-naming-convention) for the full resource map. For the design rationale behind this topology, see [Queue Architecture](/architecture/queues). + When using the RabbitMQ adapter, iii creates exchanges and queues using a predictable naming convention. For a queue named `payment`, the main queue is `iii.__fn_queue::payment`, the retry queue is `iii.__fn_queue::payment::retry.queue`, and the DLQ is `iii.__fn_queue::payment::dlq.queue`. See [Dead Letter Queues](./dead-letter-queues#dlq-naming-convention) for the full resource map. ## Queue Config Reference @@ -1056,7 +1454,7 @@ For the full module configuration including adapter settings, see the [Queue mod Full configuration reference for queues and adapters - - Design rationale behind retry, dead-lettering, and multi-resource topology + + Filter queue messages with condition functions diff --git a/docs/modules/module-pubsub.mdx b/docs/modules/module-pubsub.mdx index 3c842f889..01d3d4324 100644 --- a/docs/modules/module-pubsub.mdx +++ b/docs/modules/module-pubsub.mdx @@ -289,12 +289,12 @@ iii.trigger(TriggerRequest { ## PubSub vs Queue -| Feature | PubSub | Queue | +| Feature | PubSub | Queue (topic-based) | |---|---|---| -| Delivery | Broadcast to all subscribers | Single consumer per message | +| Delivery | Broadcast to all subscribers | Fan-out to each subscribed function; replicas of the same function compete | | Persistence | No (fire-and-forget) | Yes (with retries and DLQ) | | Ordering | Not guaranteed | FIFO within topic | -| Best for | Real-time notifications, fanout | Reliable background processing | +| Best for | Real-time notifications, fire-and-forget fanout | Reliable fanout with retries and dead-letter support | ## PubSub Flow diff --git a/docs/modules/module-queue.mdx b/docs/modules/module-queue.mdx index 90b0850f0..300a29686 100644 --- a/docs/modules/module-queue.mdx +++ b/docs/modules/module-queue.mdx @@ -17,14 +17,18 @@ modules::queue::QueueModule ### Topic-based queues -Register a consumer for a topic and emit events to it. +Register consumers for a topic and emit events to it. Messages are delivered using **fan-out per function**: every distinct function subscribed to a topic receives a copy of each message. When multiple replicas of the same function are running, they compete on a shared per-function queue — only one replica processes each message. 1. **Register a consumer** with `registerTrigger({ type: 'queue', function_id: 'my::handler', config: { topic: 'order.created' } })`. This subscribes the handler to that topic. -2. **Emit events** by calling `trigger({ function_id: 'enqueue', payload: { topic: 'order.created', data: payload } })` or `trigger({ function_id: 'enqueue', payload: { topic, data }, action: TriggerAction.Void() })` for fire-and-forget. The `enqueue` function routes the payload to all subscribers of that topic. +2. **Emit events** by calling `trigger({ function_id: 'enqueue', payload: { topic: 'order.created', data: payload } })` or `trigger({ function_id: 'enqueue', payload: { topic, data }, action: TriggerAction.Void() })` for fire-and-forget. The `enqueue` function fans out the payload to every subscribed function. 3. **Action on the trigger**: the handler receives the `data` as its input. Optional `queue_config` on the trigger controls per-subscriber retries and concurrency. The producer knows the topic name; consumers register to receive it. Queue settings can live at the trigger registration site. + + If functions `A` and `B` both subscribe to topic `order.created`, each message published to that topic is delivered to **both** `A` and `B`. If function `A` has 3 replicas running, only one replica of `A` processes each message — they compete on a shared queue. This gives you pub/sub-style fan-out with the durability and retry guarantees of a queue. + + ### Named queues Define queues in `iii-config.yaml`, then enqueue function calls directly. No trigger registration needed. @@ -41,8 +45,9 @@ The producer targets the function and queue explicitly. Queue configuration is c |---|---|---| | **Producer** | Calls `trigger({ function_id: 'enqueue', payload: { topic, data } })` | Calls `trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) })` | | **Consumer** | Registers `registerTrigger({ type: 'queue', config: { topic } })` | No registration — function is the target | +| **Delivery** | Fan-out: each subscribed function gets every message; replicas compete | Single target function per enqueue call | | **Config** | Optional `queue_config` on trigger | `queue_configs` in `iii-config.yaml` | -| **Use case** | Pub/sub, multiple subscribers per topic | Direct function invocation with retries, FIFO, DLQ | +| **Use case** | Durable pub/sub with retries and fan-out | Direct function invocation with retries, FIFO, DLQ | Both modes are valid. Named queues offer config-driven retries, concurrency, and FIFO ordering. @@ -186,6 +191,18 @@ For each named queue defined in `queue_configs`, iii creates the following Rabbi Each named queue creates six RabbitMQ objects to support delayed retry and dead-lettering. For the design rationale, see [Queue Architecture](/architecture/queues). +#### Topic-based queue naming in RabbitMQ + +For topic-based queues, iii uses a **fanout exchange** per topic. Each subscribed function gets its own queue and DLQ bound to the exchange: + +| Resource | Format | Example (topic `order.created`, function `notify::email`) | +|----------|--------|-----------------------------------------------------------| +| Fanout exchange | `iii..exchange` | `iii.order.created.exchange` | +| Per-function queue | `iii...queue` | `iii.order.created.notify::email.queue` | +| Per-function DLQ | `iii...dlq` | `iii.order.created.notify::email.dlq` | + +RabbitMQ's fanout exchange natively delivers a copy of each published message to every bound queue, providing fan-out delivery. + ## Adapter Comparison | | BuiltinQueueAdapter | RabbitMQAdapter | RedisAdapter | @@ -208,12 +225,12 @@ The queue module registers the following functions automatically when it initial ### enqueue -Publishes a message to a topic-based queue. +Publishes a message to a topic-based queue. The message is fanned out to every distinct function subscribed to that topic. Replicas of the same function compete on a shared per-function queue. | Field | Type | Description | |-------|------|-------------| | `topic` | `string` | The topic to publish to (required, non-empty) | -| `data` | `any` | The payload to deliver to subscribers | +| `data` | `any` | The payload delivered to each subscribed function | Returns `null` on success. @@ -248,6 +265,8 @@ iii trigger \ ## Queue Flow +### Named queue flow + ```mermaid sequenceDiagram participant C as Caller @@ -276,3 +295,48 @@ sequenceDiagram end end ``` + +### Topic-based queue flow (fan-out) + +When a message is published to a topic, the engine fans it out to every distinct function subscribed to that topic. Replicas of the same function compete on their shared per-function queue. + +```mermaid +sequenceDiagram + participant P as Producer + participant E as Engine + participant Q1 as order.created::notify::email + participant Q2 as order.created::audit::log + participant E1 as notify::email (replica 1) + participant E2 as notify::email (replica 2) + participant A1 as audit::log + + Note over Q1,Q2: Both functions subscribed to topic "order.created" + + P->>E: enqueue({ topic: "order.created", data }) + + par Engine copies message to each function's queue + E->>Q1: push(data) + E->>Q2: push(data) + end + + Note over Q1,E2: Replicas compete — only one processes the message + Q1->>E1: deliver + E1-->>Q1: Ack + + Q2->>A1: deliver + A1-->>Q2: Ack + + P->>E: enqueue({ topic: "order.created", data }) + + par + E->>Q1: push(data) + E->>Q2: push(data) + end + + Note over Q1,E2: This time replica 2 wins + Q1->>E2: deliver + E2-->>Q1: Ack + + Q2->>A1: deliver + A1-->>Q2: Ack +```