Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions event-driven-cqrs/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ name: event-driven-cqrs
description: >-
Implements CQRS with event sourcing on the iii engine. Use when building
command/query separation, event-sourced systems, or fan-out architectures
where commands publish domain events and multiple read model projections
subscribe independently.
where commands emit domain events to queue topics and multiple read model
projections consume independently.
---

# Event-Driven CQRS & Event Sourcing
Expand All @@ -15,19 +15,20 @@ Comparable to: Kafka, RabbitMQ, CQRS/Event Sourcing systems

Use the concepts below when they fit the task. Not every CQRS system needs all of them.

- **Write side**: Commands validate input and publish domain events via pubsub
- **Read side**: Multiple projections subscribe to events independently, building query-optimized views in state
- **Write side**: Commands validate input and emit domain events to queue topics
- **Read side**: Multiple projections consume events independently, building query-optimized views in state
- **Event log**: Events are appended to state as an ordered log (event sourcing)
- **PubSub** handles fan-out — one event reaches all projections and downstream consumers
- **Topic-based queues** handle fan-out across independently bound consumers
- **Named queues** handle dedicated workloads (alerts, notifications, heavy compute)
- **HTTP triggers** expose both command endpoints (POST) and query endpoints (GET)

## Architecture

```text
HTTP POST /inventory (command)
→ cmd::add-inventory-item → validate → append event to state
publish('inventory.item-added')
↓ (fan-out via subscribe triggers)
enqueue(topic='inventory.item-added')
↓ (fan-out via queue topic triggers)
→ proj::inventory-list (updates queryable list view)
→ proj::inventory-stats (updates aggregate counters)
→ notify::inventory-alert (sends low-stock alerts)
Expand All @@ -43,38 +44,45 @@ HTTP GET /inventory (query)
| `registerWorker` | Initialize the worker and connect to iii |
| `registerFunction` | Define commands, projections, and queries |
| `trigger({ function_id: 'state::set/get/list', payload })` | Event log and projection state |
| `trigger({ function_id: 'publish', payload })` | Publish domain events |
| `registerTrigger({ type: 'subscribe', config: { topic } })` | Subscribe projections to events |
| `trigger({ function_id: 'enqueue', payload: { topic, data } })` | Emit domain events by topic queue |
| `registerTrigger({ type: 'queue', config: { topic } })` | Bind projections to event topic queues |
| `trigger({ ..., action: TriggerAction.Enqueue({ queue }) })` | Dispatch dedicated work to named queues |
| `registerTrigger({ type: 'http' })` | Command and query endpoints |
| `trigger({ ..., action: TriggerAction.Void() })` | Fire-and-forget notifications |
| `trigger({ ..., action: TriggerAction.Void() })` | Optional non-critical side effects |

## Reference Implementation

See [../references/event-driven-cqrs.js](../references/event-driven-cqrs.js) for the full working example — an inventory management system
with commands that publish domain events and multiple projections building query-optimized views.
with commands that emit domain events through queue topics and multiple projections building query-optimized views.

## Common Patterns

Code using this pattern commonly includes, when relevant:

- `registerWorker(url, { workerName })` — worker initialization
- `trigger({ function_id: 'state::set', payload: { scope: 'events', key, value } })` — event log append
- `trigger({ function_id: 'publish', payload: { topic, data } })` — domain event publishing
- `registerTrigger({ type: 'subscribe', function_id, config: { topic } })` — projection subscriptions
- `trigger({ function_id: 'enqueue', payload: { topic, data } })` — domain event enqueue by topic
- `registerTrigger({ type: 'queue', function_id, config: { topic } })` — projection topic queue bindings
- `trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) })` — named queue handoff
- Command functions with `cmd::` prefix, projection functions with `proj::` prefix, query functions with `query::` prefix
- Multiple projections subscribing to the same topic independently
- Multiple projections consuming the same topic independently
- `const logger = new Logger()` — structured logging per command/projection

## Adapting This Pattern

Use the adaptations below when they apply to the task.

- Add new projections by registering subscribe triggers on existing event topics
- Add new projections by registering queue topic triggers on existing event topics
- Use separate state scopes for each projection (e.g. `inventory-list`, `inventory-stats`)
- Commands should validate before publishing — reject invalid commands early
- For critical event processing, use `TriggerAction.Enqueue({ queue })` instead of pubsub for guaranteed delivery
- Commands should validate before enqueueing events — reject invalid commands early
- Use named queues for slow downstream workloads (alerts, webhooks, notifications)
- Event IDs should be unique and monotonic for ordering (e.g. `evt-${Date.now()}-${counter}`)

## Queue Mode Choice

- **Topic-based queues** for domain events and independent projection fanout.
- **Named queues** for bounded worker pools where retry/concurrency/FIFO are tuned per workload.

## Pattern Boundaries

- If the task is about simple CRUD with reactive side effects, prefer `reactive-backend`.
Expand Down
12 changes: 11 additions & 1 deletion functions-and-triggers/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Use the concepts below when they fit the task. Not every worker needs all of the
- Functions invoke other functions via `trigger()` regardless of language or worker location
- The engine handles serialization, routing, and delivery automatically
- HTTP-invoked functions wrap external endpoints as callable function IDs
- Queue triggers can be bound by `topic` (topic-based queues) or by named `queue` depending on pattern

## Architecture

Expand All @@ -32,6 +33,7 @@ SDK init connects the worker to the engine, `registerFunction` defines handlers,
| `registerFunction({ id }, handler)` | Define a function handler |
| `registerTrigger({ type, function_id, config })` | Bind an event source to a function |
| `trigger({ function_id, payload })` | Invoke a function synchronously |
| `trigger({ function_id: 'enqueue', payload: { topic, data } })` | Topic-based queue dispatch |
| `trigger({ ..., action: TriggerAction.Void() })` | Fire-and-forget invocation |
| `trigger({ ..., action: TriggerAction.Enqueue({ queue }) })` | Durable async invocation via queue |

Expand All @@ -47,11 +49,13 @@ Code using this pattern commonly includes, when relevant:
- `init('ws://localhost:49134')` — connect to the engine
- `registerFunction({ id: 'namespace::name' }, async (input) => { ... })` — register a handler
- `registerTrigger({ type: 'http', function_id, config: { api_path, http_method } })` — HTTP trigger
- `registerTrigger({ type: 'queue', function_id, config: { topic } })` — queue trigger
- `registerTrigger({ type: 'queue', function_id, config: { topic } })` — topic-based queue trigger
- `registerTrigger({ type: 'queue', function_id, config: { queue } })` — named queue trigger
- `registerTrigger({ type: 'cron', function_id, config: { expression } })` — cron trigger
- `registerTrigger({ type: 'state', function_id, config: { scope, key } })` — state change trigger
- `registerTrigger({ type: 'stream', function_id, config: { stream } })` — stream trigger
- `registerTrigger({ type: 'subscribe', function_id, config: { topic } })` — pubsub subscriber
- `trigger({ function_id: 'enqueue', payload: { topic, data } })` — topic queue publish
- Cross-language invocation: a TypeScript function can trigger a Python or Rust function by ID

## Adapting This Pattern
Expand All @@ -61,10 +65,16 @@ Use the adaptations below when they apply to the task.
- Replace placeholder handler logic with real business logic (API calls, DB queries, LLM calls)
- Use `namespace::name` convention for function IDs to group related functions
- For HTTP endpoints, configure `api_path` and `http_method` in the trigger config
- For topic-based queue fanout, use `function_id: 'enqueue'` and bind consumers with `type: 'queue'` + `config.topic`
- For durable async work, use `TriggerAction.Enqueue({ queue })` instead of synchronous trigger
- For fire-and-forget side effects, use `TriggerAction.Void()`
- Multiple workers in different languages can register functions that invoke each other by ID

## Queue Mode Choice

- **Topic-based queue mode**: bind consumers with `registerTrigger({ type: 'queue', config: { topic } })` and emit with `function_id: 'enqueue'`.
- **Named queue mode**: dispatch directly with `TriggerAction.Enqueue({ queue })` when producers target explicit workload queues.

## Pattern Boundaries

- For HTTP endpoint specifics (request/response format, path params), prefer `http-endpoints`.
Expand Down
30 changes: 25 additions & 5 deletions queue-processing/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,24 @@ Comparable to: BullMQ, Celery, SQS

Use the concepts below when they fit the task. Not every queue setup needs all of them.

- **Topic-based queues** route by `topic` and are triggered via `function_id: 'enqueue'`
- **Named queues** are declared in `iii-config.yaml` under `queue_configs`
- **Standard queues** process jobs concurrently; **FIFO queues** preserve ordering
- `TriggerAction.Enqueue({ queue })` dispatches a job to a named queue
- `trigger({ function_id: 'enqueue', payload: { topic, data } })` dispatches by topic
- Failed jobs **auto-retry** with exponential backoff up to `max_retries`
- Jobs that exhaust retries land in a **dead letter queue** for inspection
- Each consumer function receives the job payload and a `messageReceiptId`

## Architecture

Producer function
Producer function (topic-based)
→ trigger({ function_id: 'enqueue', payload: { topic, data } })
→ Topic queue consumer(s)
→ success / retry with backoff
→ Dead Letter Queue (after max_retries)

Producer function (named queue)
→ TriggerAction.Enqueue({ queue: 'task-queue' })
→ Named Queue (standard or FIFO)
→ Consumer registerFunction handler
Expand All @@ -35,10 +43,12 @@ Use the concepts below when they fit the task. Not every queue setup needs all o

| Primitive | Purpose |
| ------------------------------------------------------------ | ---------------------------------------------- |
| `registerFunction` | Define the consumer that processes jobs |
| `trigger({ ..., action: TriggerAction.Enqueue({ queue }) })` | Dispatch a job to a named queue |
| `messageReceiptId` | Acknowledge or track individual job processing |
| `queue_configs` in `iii-config.yaml` | Declare queues with concurrency and retries |
| `registerFunction` | Define consumers that process jobs |
| `registerTrigger({ type: 'queue', config: { topic } })` | Bind a function to a topic-based queue |
| `trigger({ function_id: 'enqueue', payload: { topic, data } })` | Dispatch by topic |
| `trigger({ ..., action: TriggerAction.Enqueue({ queue }) })` | Dispatch to a named queue |
| `messageReceiptId` | Track individual job enqueue/processing |
| `queue_configs` in `iii-config.yaml` | Tune named queues (retry, concurrency, type) |

## Reference Implementation

Expand All @@ -50,6 +60,8 @@ Code using this pattern commonly includes, when relevant:

- `registerWorker(url, { workerName })` — worker initialization
- `registerFunction(id, handler)` — define the consumer
- `registerTrigger({ type: 'queue', function_id, config: { topic } })` — topic queue consumer binding
- `trigger({ function_id: 'enqueue', payload: { topic, data } })` — topic queue dispatch
- `trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) })` — enqueue a job
- `payload.messageReceiptId` — track or acknowledge the job
- `trigger({ function_id: 'state::set', payload })` — persist results after processing
Expand All @@ -59,11 +71,19 @@ Code using this pattern commonly includes, when relevant:

Use the adaptations below when they apply to the task.

- Choose topic-based queues when producers only know an event topic and many consumers can bind independently
- Choose named queues when you want explicit workload queues and per-queue tuning in `queue_configs`
- Choose FIFO queues when job ordering matters (e.g. sequential pipeline steps)
- Set `max_retries` and `concurrency` in queue config to match your workload
- Chain multiple queues for multi-stage pipelines (queue A consumer enqueues to queue B)
- For idempotency, check state before processing to avoid duplicate work on retries

## When To Choose Which

- **Topic-based queues**: use when producers emit events by semantic topic and consumers bind independently to those topics.
- **Named queues**: use when producers explicitly target workload queues with dedicated retry/concurrency/FIFO tuning.
- **Hybrid**: use topic-based edges for event fanout and named queues for heavy or side-effecting downstream workloads.

## Engine Configuration

Named queues are declared in iii-config.yaml under `queue_configs` with per-queue `max_retries`, `concurrency`, `type`, and `backoff_ms`. See [../references/iii-config.yaml](../references/iii-config.yaml) for the full annotated config reference.
Expand Down
47 changes: 27 additions & 20 deletions references/event-driven-cqrs.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
* Comparable to: Kafka, RabbitMQ, CQRS/Event Sourcing systems
*
* Demonstrates CQRS (Command Query Responsibility Segregation) with
* event sourcing. Commands publish domain events via pubsub. Multiple
* read model projections subscribe independently. PubSub handles all
* fan-out — both to projections and downstream notification consumers.
* event sourcing. Commands emit domain events via topic-based queues.
* Multiple read model projections consume topics independently. Named
* queues handle dedicated alert delivery workloads.
*
* How-to references:
* - Queues: https://iii.dev/docs/how-to/use-queues
* - State management: https://iii.dev/docs/how-to/manage-state
* - State reactions: https://iii.dev/docs/how-to/react-to-state-changes
* - HTTP endpoints: https://iii.dev/docs/how-to/expose-http-endpoint
* - PubSub: https://iii.dev/docs/how-to/use-functions-and-triggers
* - Functions/Triggers: https://iii.dev/docs/how-to/use-functions-and-triggers
*/

import { registerWorker, Logger, TriggerAction } from 'iii-sdk'
Expand Down Expand Up @@ -48,8 +48,11 @@ iii.registerFunction({ id: 'cmd::add-inventory-item' }, async (data) => {

await appendEvent('inventory', sku, event)

// Publish domain event for all projections to consume
iii.trigger({ function_id: 'publish', payload: { topic: 'inventory.item-added', data: event }, action: TriggerAction.Void() })
// Emit domain event for all topic queue consumers to process
await iii.trigger({
function_id: 'enqueue',
payload: { topic: 'inventory.item-added', data: event },
})
Comment on lines +51 to +55
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Event-log append and fan-out are not atomic.

appendEvent() completes before each enqueue call, so a failure here leaves event-log ahead of the projections. Because the query endpoints read projections rather than event-log, the system stays inconsistent until you replay manually. Consider persisting an outbox/replay marker with the append.

Also applies to: 82-85

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@references/event-driven-cqrs.js` around lines 51 - 55, The appendEvent()
followed by multiple iii.trigger({ function_id: 'enqueue', ... }) calls is
non-atomic: appendEvent() can succeed while one or more enqueue fan-out calls
fail, leaving event-log ahead of projections. Fix by adopting an
outbox/replay-marker pattern: make appendEvent() persist the event plus an
outbox entry or advance a replay marker in the same atomic
operation/transaction, then have a separate reliable dispatcher read the
outbox/replay marker and call iii.trigger('enqueue', ...) (or enqueue) only
after the write is committed; alternatively, enqueue the fan-out work into the
same durable store (outbox table) and have a background process perform
iii.trigger() from that outbox with retry/at-least-once semantics so projections
and event-log stay consistent. Ensure code changes touch appendEvent(), the
outbox/replay marker persistence, and the dispatcher that invokes
iii.trigger/enqueue.


return { event: 'ItemAdded', sku }
})
Expand All @@ -76,7 +79,10 @@ iii.registerFunction({ id: 'cmd::sell-item' }, async (data) => {

await appendEvent('inventory', sku, event)

iii.trigger({ function_id: 'publish', payload: { topic: 'inventory.item-sold', data: event }, action: TriggerAction.Void() })
await iii.trigger({
function_id: 'enqueue',
payload: { topic: 'inventory.item-sold', data: event },
})

return { event: 'ItemSold', sku, remaining: item.stock - quantity }
})
Expand Down Expand Up @@ -145,41 +151,42 @@ iii.registerFunction({ id: 'proj::sales-analytics' }, async (event) => {
} })
})

// Projections subscribe to domain events independently via pubsub
iii.registerTrigger({ type: 'subscribe', function_id: 'proj::catalog-on-add', config: { topic: 'inventory.item-added' } })
iii.registerTrigger({ type: 'subscribe', function_id: 'proj::catalog-on-sell', config: { topic: 'inventory.item-sold' } })
iii.registerTrigger({ type: 'subscribe', function_id: 'proj::sales-analytics', config: { topic: 'inventory.item-sold' } })
// Projections consume domain events independently via queue topic triggers
iii.registerTrigger({ type: 'queue', function_id: 'proj::catalog-on-add', config: { topic: 'inventory.item-added' } })
iii.registerTrigger({ type: 'queue', function_id: 'proj::catalog-on-sell', config: { topic: 'inventory.item-sold' } })
iii.registerTrigger({ type: 'queue', function_id: 'proj::sales-analytics', config: { topic: 'inventory.item-sold' } })
Comment on lines +154 to +157
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Low-stock alerts race the sell projection.

notify::low-stock-alert and proj::catalog-on-sell consume inventory.item-sold independently, so this handler can read inventory-read before the decrement lands and miss threshold crossings. Put the post-sale stock on the ItemSold event, or trigger the alert from the code path that writes the updated projection.

💡 Possible direction
 const event = {
   type: 'ItemSold',
   sku,
+  name: item.name,
   quantity,
   revenue: quantity * item.price,
+  remaining_stock: item.stock - quantity,
   timestamp: new Date().toISOString(),
 }

 iii.registerFunction({ id: 'notify::low-stock-alert' }, async (event) => {
-  const item = await iii.trigger({ function_id: 'state::get', payload: { scope: 'inventory-read', key: event.sku } })
-  if (item && item.stock <= 5) {
+  if (event.remaining_stock <= 5) {
     await iii.trigger({
       function_id: 'notify::slack-low-stock',
-      payload: { sku: event.sku, name: item.name, remaining: item.stock },
+      payload: { sku: event.sku, name: event.name, remaining: event.remaining_stock },
       action: TriggerAction.Enqueue({ queue: 'alerts-notify' }),
     })
   }
 })

Also applies to: 162-177

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@references/event-driven-cqrs.js` around lines 154 - 157,
notify::low-stock-alert races with proj::catalog-on-sell because both
independently consume inventory.item-sold and the alert handler can read
inventory-read before the projection decrement has been written; fix by ensuring
the alert receives the post-sale stock value or by emitting/triggering the
low-stock notification from the projection update path after
proj::catalog-on-sell finishes writing inventory-read. Specifically, either
include the updated stock count on the inventory.item-sold event (so
notify::low-stock-alert can make a correct decision) or invoke
notify::low-stock-alert (or publish a dedicated low-stock topic) from inside the
proj::catalog-on-sell projection write routine once the decrement is persisted.


// ===================================================================
// FAN-OUT — PubSub notifications to downstream systems
// FAN-OUT — mixed queue patterns for downstream systems
// ===================================================================
iii.registerFunction({ id: 'notify::low-stock-alert' }, async (event) => {
const item = await iii.trigger({ function_id: 'state::get', payload: { scope: 'inventory-read', key: event.sku } })
if (item && item.stock <= 5) {
iii.trigger({ function_id: 'publish', payload: {
topic: 'alerts.low-stock',
data: { sku: event.sku, name: item.name, remaining: item.stock },
}, action: TriggerAction.Void() })
await iii.trigger({
function_id: 'notify::slack-low-stock',
payload: { sku: event.sku, name: item.name, remaining: item.stock },
action: TriggerAction.Enqueue({ queue: 'alerts-notify' }),
})
}
})

iii.registerTrigger({
type: 'subscribe',
type: 'queue',
function_id: 'notify::low-stock-alert',
config: { topic: 'inventory.item-sold' },
})

// Fan-out subscriber: could be a separate service listening for alerts
// Named queue worker: could be a separate service notifying Slack/email/pager
iii.registerFunction({ id: 'notify::slack-low-stock' }, async (data) => {
const logger = new Logger()
logger.warn('LOW STOCK ALERT', { sku: data.sku, remaining: data.remaining })
// In production: POST to Slack webhook, send email, page oncall, etc.
})

iii.registerTrigger({
type: 'subscribe',
type: 'queue',
function_id: 'notify::slack-low-stock',
config: { topic: 'alerts.low-stock' },
config: { queue: 'alerts-notify' },
})

// ===================================================================
Expand Down
Loading