diff --git a/event-driven-cqrs/SKILL.md b/event-driven-cqrs/SKILL.md index 1fccf06..d5f95cd 100644 --- a/event-driven-cqrs/SKILL.md +++ b/event-driven-cqrs/SKILL.md @@ -3,8 +3,8 @@ name: event-driven-cqrs description: >- Implements CQRS with event sourcing on the iii engine. Use when building command/query separation, event-sourced systems, or fan-out architectures - where commands publish domain events and multiple read model projections - subscribe independently. + where commands emit domain events to queue topics and multiple read model + projections consume independently. --- # Event-Driven CQRS & Event Sourcing @@ -15,10 +15,11 @@ Comparable to: Kafka, RabbitMQ, CQRS/Event Sourcing systems Use the concepts below when they fit the task. Not every CQRS system needs all of them. -- **Write side**: Commands validate input and publish domain events via pubsub -- **Read side**: Multiple projections subscribe to events independently, building query-optimized views in state +- **Write side**: Commands validate input and emit domain events to queue topics +- **Read side**: Multiple projections consume events independently, building query-optimized views in state - **Event log**: Events are appended to state as an ordered log (event sourcing) -- **PubSub** handles fan-out — one event reaches all projections and downstream consumers +- **Topic-based queues** handle fan-out across independently bound consumers +- **Named queues** handle dedicated workloads (alerts, notifications, heavy compute) - **HTTP triggers** expose both command endpoints (POST) and query endpoints (GET) ## Architecture @@ -26,8 +27,8 @@ Use the concepts below when they fit the task. Not every CQRS system needs all o ```text HTTP POST /inventory (command) → cmd::add-inventory-item → validate → append event to state - → publish('inventory.item-added') - ↓ (fan-out via subscribe triggers) + → enqueue(topic='inventory.item-added') + ↓ (fan-out via queue topic triggers) → proj::inventory-list (updates queryable list view) → proj::inventory-stats (updates aggregate counters) → notify::inventory-alert (sends low-stock alerts) @@ -43,15 +44,16 @@ HTTP GET /inventory (query) | `registerWorker` | Initialize the worker and connect to iii | | `registerFunction` | Define commands, projections, and queries | | `trigger({ function_id: 'state::set/get/list', payload })` | Event log and projection state | -| `trigger({ function_id: 'publish', payload })` | Publish domain events | -| `registerTrigger({ type: 'subscribe', config: { topic } })` | Subscribe projections to events | +| `trigger({ function_id: 'enqueue', payload: { topic, data } })` | Emit domain events by topic queue | +| `registerTrigger({ type: 'queue', config: { topic } })` | Bind projections to event topic queues | +| `trigger({ ..., action: TriggerAction.Enqueue({ queue }) })` | Dispatch dedicated work to named queues | | `registerTrigger({ type: 'http' })` | Command and query endpoints | -| `trigger({ ..., action: TriggerAction.Void() })` | Fire-and-forget notifications | +| `trigger({ ..., action: TriggerAction.Void() })` | Optional non-critical side effects | ## Reference Implementation See [../references/event-driven-cqrs.js](../references/event-driven-cqrs.js) for the full working example — an inventory management system -with commands that publish domain events and multiple projections building query-optimized views. +with commands that emit domain events through queue topics and multiple projections building query-optimized views. ## Common Patterns @@ -59,22 +61,28 @@ Code using this pattern commonly includes, when relevant: - `registerWorker(url, { workerName })` — worker initialization - `trigger({ function_id: 'state::set', payload: { scope: 'events', key, value } })` — event log append -- `trigger({ function_id: 'publish', payload: { topic, data } })` — domain event publishing -- `registerTrigger({ type: 'subscribe', function_id, config: { topic } })` — projection subscriptions +- `trigger({ function_id: 'enqueue', payload: { topic, data } })` — domain event enqueue by topic +- `registerTrigger({ type: 'queue', function_id, config: { topic } })` — projection topic queue bindings +- `trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) })` — named queue handoff - Command functions with `cmd::` prefix, projection functions with `proj::` prefix, query functions with `query::` prefix -- Multiple projections subscribing to the same topic independently +- Multiple projections consuming the same topic independently - `const logger = new Logger()` — structured logging per command/projection ## Adapting This Pattern Use the adaptations below when they apply to the task. -- Add new projections by registering subscribe triggers on existing event topics +- Add new projections by registering queue topic triggers on existing event topics - Use separate state scopes for each projection (e.g. `inventory-list`, `inventory-stats`) -- Commands should validate before publishing — reject invalid commands early -- For critical event processing, use `TriggerAction.Enqueue({ queue })` instead of pubsub for guaranteed delivery +- Commands should validate before enqueueing events — reject invalid commands early +- Use named queues for slow downstream workloads (alerts, webhooks, notifications) - Event IDs should be unique and monotonic for ordering (e.g. `evt-${Date.now()}-${counter}`) +## Queue Mode Choice + +- **Topic-based queues** for domain events and independent projection fanout. +- **Named queues** for bounded worker pools where retry/concurrency/FIFO are tuned per workload. + ## Pattern Boundaries - If the task is about simple CRUD with reactive side effects, prefer `reactive-backend`. diff --git a/functions-and-triggers/SKILL.md b/functions-and-triggers/SKILL.md index 22bd24a..442d333 100644 --- a/functions-and-triggers/SKILL.md +++ b/functions-and-triggers/SKILL.md @@ -19,6 +19,7 @@ Use the concepts below when they fit the task. Not every worker needs all of the - Functions invoke other functions via `trigger()` regardless of language or worker location - The engine handles serialization, routing, and delivery automatically - HTTP-invoked functions wrap external endpoints as callable function IDs +- Queue triggers can be bound by `topic` (topic-based queues) or by named `queue` depending on pattern ## Architecture @@ -32,6 +33,7 @@ SDK init connects the worker to the engine, `registerFunction` defines handlers, | `registerFunction({ id }, handler)` | Define a function handler | | `registerTrigger({ type, function_id, config })` | Bind an event source to a function | | `trigger({ function_id, payload })` | Invoke a function synchronously | +| `trigger({ function_id: 'enqueue', payload: { topic, data } })` | Topic-based queue dispatch | | `trigger({ ..., action: TriggerAction.Void() })` | Fire-and-forget invocation | | `trigger({ ..., action: TriggerAction.Enqueue({ queue }) })` | Durable async invocation via queue | @@ -47,11 +49,13 @@ Code using this pattern commonly includes, when relevant: - `init('ws://localhost:49134')` — connect to the engine - `registerFunction({ id: 'namespace::name' }, async (input) => { ... })` — register a handler - `registerTrigger({ type: 'http', function_id, config: { api_path, http_method } })` — HTTP trigger -- `registerTrigger({ type: 'queue', function_id, config: { topic } })` — queue trigger +- `registerTrigger({ type: 'queue', function_id, config: { topic } })` — topic-based queue trigger +- `registerTrigger({ type: 'queue', function_id, config: { queue } })` — named queue trigger - `registerTrigger({ type: 'cron', function_id, config: { expression } })` — cron trigger - `registerTrigger({ type: 'state', function_id, config: { scope, key } })` — state change trigger - `registerTrigger({ type: 'stream', function_id, config: { stream } })` — stream trigger - `registerTrigger({ type: 'subscribe', function_id, config: { topic } })` — pubsub subscriber +- `trigger({ function_id: 'enqueue', payload: { topic, data } })` — topic queue publish - Cross-language invocation: a TypeScript function can trigger a Python or Rust function by ID ## Adapting This Pattern @@ -61,10 +65,16 @@ Use the adaptations below when they apply to the task. - Replace placeholder handler logic with real business logic (API calls, DB queries, LLM calls) - Use `namespace::name` convention for function IDs to group related functions - For HTTP endpoints, configure `api_path` and `http_method` in the trigger config +- For topic-based queue fanout, use `function_id: 'enqueue'` and bind consumers with `type: 'queue'` + `config.topic` - For durable async work, use `TriggerAction.Enqueue({ queue })` instead of synchronous trigger - For fire-and-forget side effects, use `TriggerAction.Void()` - Multiple workers in different languages can register functions that invoke each other by ID +## Queue Mode Choice + +- **Topic-based queue mode**: bind consumers with `registerTrigger({ type: 'queue', config: { topic } })` and emit with `function_id: 'enqueue'`. +- **Named queue mode**: dispatch directly with `TriggerAction.Enqueue({ queue })` when producers target explicit workload queues. + ## Pattern Boundaries - For HTTP endpoint specifics (request/response format, path params), prefer `http-endpoints`. diff --git a/queue-processing/SKILL.md b/queue-processing/SKILL.md index 974fd73..1c9a2db 100644 --- a/queue-processing/SKILL.md +++ b/queue-processing/SKILL.md @@ -15,16 +15,24 @@ Comparable to: BullMQ, Celery, SQS Use the concepts below when they fit the task. Not every queue setup needs all of them. +- **Topic-based queues** route by `topic` and are triggered via `function_id: 'enqueue'` - **Named queues** are declared in `iii-config.yaml` under `queue_configs` - **Standard queues** process jobs concurrently; **FIFO queues** preserve ordering - `TriggerAction.Enqueue({ queue })` dispatches a job to a named queue +- `trigger({ function_id: 'enqueue', payload: { topic, data } })` dispatches by topic - Failed jobs **auto-retry** with exponential backoff up to `max_retries` - Jobs that exhaust retries land in a **dead letter queue** for inspection - Each consumer function receives the job payload and a `messageReceiptId` ## Architecture - Producer function + Producer function (topic-based) + → trigger({ function_id: 'enqueue', payload: { topic, data } }) + → Topic queue consumer(s) + → success / retry with backoff + → Dead Letter Queue (after max_retries) + + Producer function (named queue) → TriggerAction.Enqueue({ queue: 'task-queue' }) → Named Queue (standard or FIFO) → Consumer registerFunction handler @@ -35,10 +43,12 @@ Use the concepts below when they fit the task. Not every queue setup needs all o | Primitive | Purpose | | ------------------------------------------------------------ | ---------------------------------------------- | -| `registerFunction` | Define the consumer that processes jobs | -| `trigger({ ..., action: TriggerAction.Enqueue({ queue }) })` | Dispatch a job to a named queue | -| `messageReceiptId` | Acknowledge or track individual job processing | -| `queue_configs` in `iii-config.yaml` | Declare queues with concurrency and retries | +| `registerFunction` | Define consumers that process jobs | +| `registerTrigger({ type: 'queue', config: { topic } })` | Bind a function to a topic-based queue | +| `trigger({ function_id: 'enqueue', payload: { topic, data } })` | Dispatch by topic | +| `trigger({ ..., action: TriggerAction.Enqueue({ queue }) })` | Dispatch to a named queue | +| `messageReceiptId` | Track individual job enqueue/processing | +| `queue_configs` in `iii-config.yaml` | Tune named queues (retry, concurrency, type) | ## Reference Implementation @@ -50,6 +60,8 @@ Code using this pattern commonly includes, when relevant: - `registerWorker(url, { workerName })` — worker initialization - `registerFunction(id, handler)` — define the consumer +- `registerTrigger({ type: 'queue', function_id, config: { topic } })` — topic queue consumer binding +- `trigger({ function_id: 'enqueue', payload: { topic, data } })` — topic queue dispatch - `trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) })` — enqueue a job - `payload.messageReceiptId` — track or acknowledge the job - `trigger({ function_id: 'state::set', payload })` — persist results after processing @@ -59,11 +71,19 @@ Code using this pattern commonly includes, when relevant: Use the adaptations below when they apply to the task. +- Choose topic-based queues when producers only know an event topic and many consumers can bind independently +- Choose named queues when you want explicit workload queues and per-queue tuning in `queue_configs` - Choose FIFO queues when job ordering matters (e.g. sequential pipeline steps) - Set `max_retries` and `concurrency` in queue config to match your workload - Chain multiple queues for multi-stage pipelines (queue A consumer enqueues to queue B) - For idempotency, check state before processing to avoid duplicate work on retries +## When To Choose Which + +- **Topic-based queues**: use when producers emit events by semantic topic and consumers bind independently to those topics. +- **Named queues**: use when producers explicitly target workload queues with dedicated retry/concurrency/FIFO tuning. +- **Hybrid**: use topic-based edges for event fanout and named queues for heavy or side-effecting downstream workloads. + ## Engine Configuration Named queues are declared in iii-config.yaml under `queue_configs` with per-queue `max_retries`, `concurrency`, `type`, and `backoff_ms`. See [../references/iii-config.yaml](../references/iii-config.yaml) for the full annotated config reference. diff --git a/references/event-driven-cqrs.js b/references/event-driven-cqrs.js index 9be8a2d..9490fb7 100644 --- a/references/event-driven-cqrs.js +++ b/references/event-driven-cqrs.js @@ -3,16 +3,16 @@ * Comparable to: Kafka, RabbitMQ, CQRS/Event Sourcing systems * * Demonstrates CQRS (Command Query Responsibility Segregation) with - * event sourcing. Commands publish domain events via pubsub. Multiple - * read model projections subscribe independently. PubSub handles all - * fan-out — both to projections and downstream notification consumers. + * event sourcing. Commands emit domain events via topic-based queues. + * Multiple read model projections consume topics independently. Named + * queues handle dedicated alert delivery workloads. * * How-to references: * - Queues: https://iii.dev/docs/how-to/use-queues * - State management: https://iii.dev/docs/how-to/manage-state * - State reactions: https://iii.dev/docs/how-to/react-to-state-changes * - HTTP endpoints: https://iii.dev/docs/how-to/expose-http-endpoint - * - PubSub: https://iii.dev/docs/how-to/use-functions-and-triggers + * - Functions/Triggers: https://iii.dev/docs/how-to/use-functions-and-triggers */ import { registerWorker, Logger, TriggerAction } from 'iii-sdk' @@ -48,8 +48,11 @@ iii.registerFunction({ id: 'cmd::add-inventory-item' }, async (data) => { await appendEvent('inventory', sku, event) - // Publish domain event for all projections to consume - iii.trigger({ function_id: 'publish', payload: { topic: 'inventory.item-added', data: event }, action: TriggerAction.Void() }) + // Emit domain event for all topic queue consumers to process + await iii.trigger({ + function_id: 'enqueue', + payload: { topic: 'inventory.item-added', data: event }, + }) return { event: 'ItemAdded', sku } }) @@ -76,7 +79,10 @@ iii.registerFunction({ id: 'cmd::sell-item' }, async (data) => { await appendEvent('inventory', sku, event) - iii.trigger({ function_id: 'publish', payload: { topic: 'inventory.item-sold', data: event }, action: TriggerAction.Void() }) + await iii.trigger({ + function_id: 'enqueue', + payload: { topic: 'inventory.item-sold', data: event }, + }) return { event: 'ItemSold', sku, remaining: item.stock - quantity } }) @@ -145,31 +151,32 @@ iii.registerFunction({ id: 'proj::sales-analytics' }, async (event) => { } }) }) -// Projections subscribe to domain events independently via pubsub -iii.registerTrigger({ type: 'subscribe', function_id: 'proj::catalog-on-add', config: { topic: 'inventory.item-added' } }) -iii.registerTrigger({ type: 'subscribe', function_id: 'proj::catalog-on-sell', config: { topic: 'inventory.item-sold' } }) -iii.registerTrigger({ type: 'subscribe', function_id: 'proj::sales-analytics', config: { topic: 'inventory.item-sold' } }) +// Projections consume domain events independently via queue topic triggers +iii.registerTrigger({ type: 'queue', function_id: 'proj::catalog-on-add', config: { topic: 'inventory.item-added' } }) +iii.registerTrigger({ type: 'queue', function_id: 'proj::catalog-on-sell', config: { topic: 'inventory.item-sold' } }) +iii.registerTrigger({ type: 'queue', function_id: 'proj::sales-analytics', config: { topic: 'inventory.item-sold' } }) // =================================================================== -// FAN-OUT — PubSub notifications to downstream systems +// FAN-OUT — mixed queue patterns for downstream systems // =================================================================== iii.registerFunction({ id: 'notify::low-stock-alert' }, async (event) => { const item = await iii.trigger({ function_id: 'state::get', payload: { scope: 'inventory-read', key: event.sku } }) if (item && item.stock <= 5) { - iii.trigger({ function_id: 'publish', payload: { - topic: 'alerts.low-stock', - data: { sku: event.sku, name: item.name, remaining: item.stock }, - }, action: TriggerAction.Void() }) + await iii.trigger({ + function_id: 'notify::slack-low-stock', + payload: { sku: event.sku, name: item.name, remaining: item.stock }, + action: TriggerAction.Enqueue({ queue: 'alerts-notify' }), + }) } }) iii.registerTrigger({ - type: 'subscribe', + type: 'queue', function_id: 'notify::low-stock-alert', config: { topic: 'inventory.item-sold' }, }) -// Fan-out subscriber: could be a separate service listening for alerts +// Named queue worker: could be a separate service notifying Slack/email/pager iii.registerFunction({ id: 'notify::slack-low-stock' }, async (data) => { const logger = new Logger() logger.warn('LOW STOCK ALERT', { sku: data.sku, remaining: data.remaining }) @@ -177,9 +184,9 @@ iii.registerFunction({ id: 'notify::slack-low-stock' }, async (data) => { }) iii.registerTrigger({ - type: 'subscribe', + type: 'queue', function_id: 'notify::slack-low-stock', - config: { topic: 'alerts.low-stock' }, + config: { queue: 'alerts-notify' }, }) // =================================================================== diff --git a/references/functions-and-triggers.js b/references/functions-and-triggers.js index 443ac83..d21ee23 100644 --- a/references/functions-and-triggers.js +++ b/references/functions-and-triggers.js @@ -3,7 +3,7 @@ * Comparable to: Core primitives of iii * * Demonstrates every fundamental building block: registering functions, - * binding triggers of each built-in type (http, queue, cron, state, subscribe), + * binding triggers of each built-in type (http, queue, cron, state, stream), * cross-function invocation, fire-and-forget calls, and external HTTP-invoked * functions via HttpInvocationConfig. * @@ -86,7 +86,7 @@ iii.registerTrigger({ }) // --------------------------------------------------------------------------- -// 6. Subscribe trigger — listen for pubsub messages on a topic +// 6. Queue trigger (topic-based mode) — consume events by topic // --------------------------------------------------------------------------- iii.registerFunction({ id: 'notifications::on-order-complete' }, async (data) => { const logger = new Logger() @@ -95,7 +95,7 @@ iii.registerFunction({ id: 'notifications::on-order-complete' }, async (data) => }) iii.registerTrigger({ - type: 'subscribe', + type: 'queue', function_id: 'notifications::on-order-complete', config: { topic: 'orders.completed' }, }) @@ -116,11 +116,13 @@ iii.registerFunction({ id: 'orders::create' }, async (data) => { return { error: validation.reason } } - // Fire-and-forget — send a notification without waiting - iii.trigger({ - function_id: 'notifications::on-order-complete', - payload: { order_id: data.order_id }, - action: TriggerAction.Void(), + // Topic-based queue emit — any queue trigger on this topic receives the event + await iii.trigger({ + function_id: 'enqueue', + payload: { + topic: 'orders.completed', + data: { order_id: data.order_id }, + }, }) // Enqueue — durable async handoff to fulfillment diff --git a/references/queue-processing.js b/references/queue-processing.js index ff8ed2b..b16ab18 100644 --- a/references/queue-processing.js +++ b/references/queue-processing.js @@ -26,6 +26,45 @@ const iii = registerWorker(process.env.III_ENGINE_URL || 'ws://localhost:49134', workerName: 'queue-processing', }) +// --------------------------------------------------------------------------- +// Topic-based queue mode +// Producer emits by topic and any queue-trigger subscribers on that topic receive it. +// --------------------------------------------------------------------------- +iii.registerFunction({ id: 'orders::emit-created' }, async (data) => { + const logger = new Logger() + + const result = await iii.trigger({ + function_id: 'enqueue', + payload: { + topic: 'orders.created', + data: { + orderId: data.orderId, + customerId: data.customerId, + total: data.total, + }, + }, + }) + + logger.info('Order event enqueued by topic', { + orderId: data.orderId, + messageReceiptId: result.messageReceiptId, + }) + + return { status: 'queued', messageReceiptId: result.messageReceiptId } +}) + +iii.registerFunction({ id: 'orders::on-created' }, async (data) => { + const logger = new Logger() + logger.info('Handling order topic event', { orderId: data.orderId }) + return { handled: true, orderId: data.orderId } +}) + +iii.registerTrigger({ + type: 'queue', + function_id: 'orders::on-created', + config: { topic: 'orders.created' }, +}) + // --------------------------------------------------------------------------- // Enqueue work — standard queue (concurrent processing) // ---------------------------------------------------------------------------