diff --git a/.gitignore b/.gitignore index 2ca7367..4ed5a1a 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,14 @@ dist/ *.mjs !tsconfig.json +# Rust +target/ +Cargo.lock + +# Lock files +pnpm-lock.yaml +package-lock.json + # IDE .idea/ .vscode/ diff --git a/README.md b/README.md index 663ca85..f70f846 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ iii --version | [api-frameworks-workers](examples/api-frameworks-workers) | Run 4 API frameworks as separate workers | | [api-frameworks-auto-register](examples/api-frameworks-auto-register) | Auto-registration + shared context (logging, state, request tracking) | | [iii-vs-traditional](examples/iii-vs-traditional) | Side-by-side: connect a Python service with iii (~120 lines) vs traditional gateway (~465 lines) | +| [polyglot-coordination](examples/polyglot-coordination) | Coordinate Python, Node.js, and Rust services (Python via stdin/stdout IPC, Rust via HTTP) | ## Quick Start diff --git a/examples/polyglot-coordination/.gitignore b/examples/polyglot-coordination/.gitignore new file mode 100644 index 0000000..1b763b0 --- /dev/null +++ b/examples/polyglot-coordination/.gitignore @@ -0,0 +1,11 @@ +node_modules/ +dist/ +*.js +*.d.ts +.DS_Store +.env +.env.local +__pycache__/ +*.pyc +target/ +Cargo.lock diff --git a/examples/polyglot-coordination/README.md b/examples/polyglot-coordination/README.md new file mode 100644 index 0000000..71e3dcb --- /dev/null +++ b/examples/polyglot-coordination/README.md @@ -0,0 +1,201 @@ +# Polyglot Coordination Example + +This example demonstrates **iii-engine** seamlessly coordinating services across **Python, Node.js, and Rust** without requiring HTTP endpoints for all services. + +**Key differentiator**: The Python service communicates via **stdin/stdout IPC** (not HTTP), showing that iii abstracts away transport mechanisms entirely. + +## Architecture + +```mermaid +flowchart TB + subgraph Client + API[curl / HTTP Client] + end + + subgraph iii[iii-engine] + ENGINE[WebSocket Hub
ws://127.0.0.1:49134] + end + + subgraph Workers[Node.js Workers] + USER[User Service
users.*] + DATA[Data Requester
analytics.*] + STRIPE_BRIDGE[Stripe Bridge
stripe.*] + end + + subgraph Services[External Services] + PYTHON[Python Analytics
stdin/stdout IPC
NO HTTP] + RUST[Rust Fake Stripe
HTTP :4040] + end + + API -->|POST /onboard| ENGINE + ENGINE <-->|WebSocket| USER + ENGINE <-->|WebSocket| DATA + ENGINE <-->|WebSocket| STRIPE_BRIDGE + DATA -->|spawns & IPC| PYTHON + STRIPE_BRIDGE -->|HTTP calls| RUST + + style PYTHON fill:#3776ab,color:#fff + style RUST fill:#dea584,color:#000 + style ENGINE fill:#6366f1,color:#fff + style USER fill:#22c55e,color:#fff + style DATA fill:#22c55e,color:#fff + style STRIPE_BRIDGE fill:#22c55e,color:#fff +``` + +## Business Scenario: SaaS User Onboarding + +A realistic workflow tying all 3 languages together: + +1. **New user signs up** → Node.js User Service +2. **Create Stripe customer + subscription** → Rust Fake Stripe (via HTTP, wrapped by iii) +3. **Run onboarding analytics** → Python Analytics (via stdin/stdout IPC) +4. **Store user profile with risk score** → Orchestrated by Node.js Workflow + +## Prerequisites + +- **Node.js 18+** or **Bun** (for workers) +- **Python 3.x** (stdlib only, no pip install needed) +- **Rust toolchain** (for Axum server) +- **iii-engine** running (`iii` command) + +## Quick Start + +### Terminal 1: Start iii-engine +```bash +iii +``` + +### Terminal 2: Start Rust Fake Stripe server +```bash +cd services/rust-stripe +cargo run --release +``` + +### Terminal 3: Install deps and start workers +```bash +pnpm install +pnpm dev +``` + +## Testing + +### Full onboarding flow (coordinates all 3 languages) +```bash +curl -X POST http://localhost:3111/onboard \ + -H "Content-Type: application/json" \ + -d '{"email": "alice@example.com", "name": "Alice Smith", "plan": "pro"}' +``` + +Expected response: +```json +{ + "message": "User onboarded successfully", + "traceId": "abc123...", + "user": { + "id": "usr_...", + "email": "alice@example.com", + "name": "Alice Smith", + "plan": "pro", + "stripeCustomerId": "cus_...", + "subscriptionId": "sub_...", + "riskScore": 45.5 + }, + "stripeCustomer": { "id": "cus_...", "email": "...", "name": "..." }, + "subscription": { "id": "sub_...", "status": "active", "plan": "pro" }, + "analytics": { "riskScore": 45.5, "factors": ["custom_domain", "full_name_provided"] } +} +``` + +### Get metrics (calls Python analytics) +```bash +curl http://localhost:3111/onboard/metrics +``` + +### Onboard multiple users and check metrics +```bash +curl -X POST http://localhost:3111/onboard \ + -d '{"email": "bob@gmail.com", "name": "Bob", "plan": "free"}' \ + -H "Content-Type: application/json" + +curl -X POST http://localhost:3111/onboard \ + -d '{"email": "carol@stanford.edu", "name": "Carol Chen", "plan": "enterprise"}' \ + -H "Content-Type: application/json" + +curl http://localhost:3111/onboard/metrics +``` + +## Key Insight + +All function calls look identical regardless of transport: + +```typescript +// Calls Rust via HTTP (wrapped by stripe-bridge) +await bridge.invokeFunction('stripe.createCustomer', { email, name }) + +// Calls Python via stdin/stdout IPC (wrapped by data-requester) +await bridge.invokeFunction('analytics.score', { userId, email, name }) + +// Calls local Node.js function +await bridge.invokeFunction('users.create', { email, name }) +``` + +**The caller doesn't know or care about the underlying transport. That's the power of iii.** + +## File Structure + +``` +polyglot-coordination/ +├── README.md +├── package.json +├── tsconfig.json +├── services/ +│ ├── python-analytics/ +│ │ └── analytics.py # stdin/stdout JSON-RPC (NO HTTP) +│ └── rust-stripe/ +│ ├── Cargo.toml +│ └── src/main.rs # Axum HTTP server +├── workers/ +│ ├── user-service.ts # User CRUD + billing +│ ├── data-requester.ts # Spawns Python, exposes analytics.* +│ └── stripe-bridge.ts # Wraps Rust HTTP as iii functions +├── workflow/ +│ └── onboarding.ts # Orchestrates full signup flow +└── lib/ + ├── bridge.ts # Shared bridge factory + ├── python-ipc.ts # Python subprocess manager + └── types.ts # Shared TypeScript types +``` + +## How It Works + +### Python Analytics (stdin/stdout) +The Python service has **no HTTP endpoints**. It reads JSON from stdin, processes it, and writes JSON to stdout: + +```python +for line in sys.stdin: + request = json.loads(line) + result = handle_request(request['method'], request['params']) + sys.stdout.write(json.dumps({'id': request['id'], 'result': result}) + '\n') +``` + +The `data-requester.ts` worker spawns this script and communicates via stdin/stdout, then exposes the functionality as iii functions. + +### Rust Fake Stripe (HTTP) +A standard Axum HTTP server providing a Stripe-like API. The `stripe-bridge.ts` worker makes HTTP calls to it and exposes the endpoints as iii functions. + +### Node.js User Service +Pure Node.js with in-memory storage, directly registered as iii functions. + +## Why This Matters + +Traditional microservice orchestration requires: +- Every service to expose HTTP/gRPC endpoints +- Service discovery (Consul, etcd, K8s DNS) +- Load balancers, ingress controllers +- API gateways for routing + +With iii-engine: +- Services can use **any transport** (HTTP, IPC, stdin/stdout, sockets) +- iii handles routing via **function names** +- No service discovery needed - iii is the registry +- No API gateway - iii exposes triggers directly diff --git a/examples/polyglot-coordination/lib/bridge.ts b/examples/polyglot-coordination/lib/bridge.ts new file mode 100644 index 0000000..23e11c6 --- /dev/null +++ b/examples/polyglot-coordination/lib/bridge.ts @@ -0,0 +1,14 @@ +import { Bridge } from '@iii-dev/sdk' + +const ENGINE_URL = process.env.III_ENGINE_URL ?? 'ws://127.0.0.1:49134' + +export function createBridge(serviceName: string): Bridge { + return new Bridge(ENGINE_URL, { + otel: { + enabled: true, + serviceName, + metricsEnabled: true, + metricsExportIntervalMs: 5000, + }, + }) +} diff --git a/examples/polyglot-coordination/lib/python-ipc.ts b/examples/polyglot-coordination/lib/python-ipc.ts new file mode 100644 index 0000000..080a3e0 --- /dev/null +++ b/examples/polyglot-coordination/lib/python-ipc.ts @@ -0,0 +1,135 @@ +import { spawn, type ChildProcess } from 'child_process' +import { createInterface, type Interface } from 'readline' +import { EventEmitter } from 'events' +import { existsSync } from 'fs' + +interface PendingRequest { + resolve: (result: unknown) => void + reject: (error: Error) => void + timeout: ReturnType +} + +const PYTHON_PATH = process.env.PYTHON_PATH ?? 'python3' + +export class PythonIPC extends EventEmitter { + private process: ChildProcess | null = null + private readline: Interface | null = null + private requestId = 0 + private pending = new Map() + private ready = false + private readyPromise: Promise + private readyResolve!: () => void + + constructor( + private scriptPath: string, + private timeoutMs = 30000, + private maxPendingRequests = 1000 + ) { + super() + this.readyPromise = new Promise((resolve) => { + this.readyResolve = resolve + }) + } + + async start(): Promise { + if (!existsSync(this.scriptPath)) { + throw new Error(`Python script not found: ${this.scriptPath}`) + } + + this.process = spawn(PYTHON_PATH, [this.scriptPath], { + stdio: ['pipe', 'pipe', 'pipe'], + }) + + this.readline = createInterface({ + input: this.process.stdout!, + crlfDelay: Infinity, + }) + + this.readline.on('line', (line) => this.handleLine(line)) + + this.process.stderr?.on('data', (data) => { + const msg = data.toString().trim() + if (msg) console.error(`[Python stderr] ${msg}`) + }) + + this.process.on('close', (code) => { + this.ready = false + this.emit('close', code) + for (const [id, req] of this.pending) { + clearTimeout(req.timeout) + req.reject(new Error(`Python process exited with code ${code}`)) + this.pending.delete(id) + } + }) + + this.process.on('error', (err) => { + this.emit('error', err) + }) + + await this.call('ping', {}) + this.ready = true + this.readyResolve() + } + + async waitReady(): Promise { + return this.readyPromise + } + + async call(method: string, params: unknown): Promise { + if (!this.process || !this.process.stdin) { + throw new Error('Python process not started') + } + + if (this.pending.size >= this.maxPendingRequests) { + throw new Error(`Too many pending requests (max: ${this.maxPendingRequests})`) + } + + const id = ++this.requestId + if (this.requestId > Number.MAX_SAFE_INTEGER - 1) { + this.requestId = 0 + } + const request = JSON.stringify({ id, method, params }) + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + this.pending.delete(id) + reject(new Error(`Request ${method} timed out after ${this.timeoutMs}ms`)) + }, this.timeoutMs) + + this.pending.set(id, { resolve: resolve as (r: unknown) => void, reject, timeout }) + this.process!.stdin!.write(request + '\n') + }) + } + + private handleLine(line: string): void { + try { + const response = JSON.parse(line) + const { id, result, error } = response + + const req = this.pending.get(id) + if (!req) return + + clearTimeout(req.timeout) + this.pending.delete(id) + + if (error) { + req.reject(new Error(error.message || error)) + } else { + req.resolve(result) + } + } catch { + console.error(`[Python IPC] Failed to parse: ${line}`) + } + } + + stop(): void { + if (this.process) { + this.process.kill() + this.process = null + } + if (this.readline) { + this.readline.close() + this.readline = null + } + } +} diff --git a/examples/polyglot-coordination/lib/types.ts b/examples/polyglot-coordination/lib/types.ts new file mode 100644 index 0000000..5639c57 --- /dev/null +++ b/examples/polyglot-coordination/lib/types.ts @@ -0,0 +1,91 @@ +export interface User { + id: string + email: string + name: string + plan?: string + stripeCustomerId?: string + subscriptionId?: string + riskScore?: number + createdAt: string +} + +export interface CreateUserInput { + email: string + name: string + plan?: string +} + +export interface GetByIdInput { + id: string +} + +export interface StripeCustomer { + id: string + email: string + name: string + created: number +} + +export interface StripeSubscription { + id: string + customer: string + plan: string + status: string + created: number +} + +export interface StripeCharge { + id: string + customer: string + amount: number + currency: string + status: string +} + +export interface CreateCustomerInput { + email: string + name: string +} + +export interface CreateSubscriptionInput { + customerId: string + plan: string +} + +export interface ChargeInput { + customerId: string + amount: number + currency?: string +} + +export interface AnalyticsScoreInput { + userId: string + email: string + name: string +} + +export interface AnalyticsResult { + userId: string + riskScore: number + factors: string[] + timestamp: string +} + +export interface OnboardingMetrics { + totalUsers: number + averageRiskScore: number + planDistribution: Record +} + +export interface OnboardUserInput { + email: string + name: string + plan: string +} + +export interface OnboardingResult { + user: User + stripeCustomer: StripeCustomer + subscription: StripeSubscription + analytics: AnalyticsResult +} diff --git a/examples/polyglot-coordination/package.json b/examples/polyglot-coordination/package.json new file mode 100644 index 0000000..d38d213 --- /dev/null +++ b/examples/polyglot-coordination/package.json @@ -0,0 +1,24 @@ +{ + "name": "@iii-dev/polyglot-coordination", + "version": "1.0.0", + "description": "Polyglot coordination example with Python, Node.js, and Rust services", + "private": true, + "type": "module", + "scripts": { + "dev": "concurrently -n stripe,data,users,workflow -c red,yellow,blue,green \"bun workers/stripe-bridge.ts\" \"bun workers/data-requester.ts\" \"bun workers/user-service.ts\" \"bun workflow/onboarding.ts\"", + "dev:stripe": "bun --watch workers/stripe-bridge.ts", + "dev:data": "bun --watch workers/data-requester.ts", + "dev:users": "bun --watch workers/user-service.ts", + "dev:workflow": "bun --watch workflow/onboarding.ts", + "stripe": "cd services/rust-stripe && cargo run --release", + "build:stripe": "cd services/rust-stripe && cargo build --release" + }, + "dependencies": { + "@iii-dev/sdk": "0.3.0" + }, + "devDependencies": { + "@types/node": "^22.10.2", + "concurrently": "^9.1.2", + "typescript": "^5.9.3" + } +} diff --git a/examples/polyglot-coordination/services/python-analytics/analytics.py b/examples/polyglot-coordination/services/python-analytics/analytics.py new file mode 100644 index 0000000..adcc7d5 --- /dev/null +++ b/examples/polyglot-coordination/services/python-analytics/analytics.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +""" +Python Analytics Service - stdin/stdout JSON-RPC +NO HTTP ENDPOINTS - communicates via IPC only + +This demonstrates that iii-engine can coordinate services that don't use HTTP. +The Node.js data-requester spawns this script and communicates via stdin/stdout. +""" + +import json +import sys +import time +import hashlib +from typing import Any + +def compute_risk_score(user_id: str, email: str, name: str) -> dict: + """ + Compute a risk score for user onboarding. + Simulates ML model inference with fake delays. + """ + time.sleep(0.05) + + factors = [] + score = 50.0 + + domain = email.split('@')[-1] if '@' in email else '' + + if domain in ('gmail.com', 'yahoo.com', 'hotmail.com'): + score += 10 + factors.append('common_email_provider') + elif domain.endswith('.edu'): + score -= 15 + factors.append('educational_domain') + elif domain.endswith('.gov'): + score -= 20 + factors.append('government_domain') + else: + score += 5 + factors.append('custom_domain') + + if len(name.split()) >= 2: + score -= 5 + factors.append('full_name_provided') + else: + score += 10 + factors.append('single_name_only') + + hash_val = int(hashlib.sha256(user_id.encode()).hexdigest()[:8], 16) + variance = (hash_val % 20) - 10 + score += variance + + score = max(0, min(100, score)) + + return { + 'userId': user_id, + 'riskScore': round(score, 2), + 'factors': factors, + 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + } + + +def compute_metrics(users: list) -> dict: + """ + Aggregate onboarding metrics from user list. + """ + if not users: + return { + 'totalUsers': 0, + 'averageRiskScore': 0, + 'planDistribution': {} + } + + total_risk = sum(u.get('riskScore', 50) for u in users) + avg_risk = total_risk / len(users) + + plan_dist: dict[str, int] = {} + for u in users: + plan = u.get('plan', 'free') + plan_dist[plan] = plan_dist.get(plan, 0) + 1 + + return { + 'totalUsers': len(users), + 'averageRiskScore': round(avg_risk, 2), + 'planDistribution': plan_dist + } + + +def handle_request(method: str, params: Any) -> Any: + """ + Route JSON-RPC methods to handlers. + """ + if method == 'ping': + return {'status': 'ok', 'service': 'python-analytics'} + + if method == 'score': + user_id = params.get('userId', '') + email = params.get('email', '') + name = params.get('name', '') + return compute_risk_score(user_id, email, name) + + if method == 'metrics': + users = params.get('users', []) + return compute_metrics(users) + + raise ValueError(f'Unknown method: {method}') + + +def main(): + """ + Main loop: read JSON from stdin, write JSON to stdout. + Each line is a complete JSON-RPC request. + """ + for line in sys.stdin: + line = line.strip() + if not line: + continue + + try: + request = json.loads(line) + req_id = request.get('id', 0) + method = request.get('method', '') + params = request.get('params', {}) + + result = handle_request(method, params) + response = {'id': req_id, 'result': result} + + except json.JSONDecodeError as e: + response = {'id': 0, 'error': {'message': f'Invalid JSON: {e}'}} + except Exception as e: + response = {'id': request.get('id', 0), 'error': {'message': str(e)}} + + sys.stdout.write(json.dumps(response) + '\n') + sys.stdout.flush() + + +if __name__ == '__main__': + main() diff --git a/examples/polyglot-coordination/services/rust-stripe/Cargo.toml b/examples/polyglot-coordination/services/rust-stripe/Cargo.toml new file mode 100644 index 0000000..f91638b --- /dev/null +++ b/examples/polyglot-coordination/services/rust-stripe/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "fake-stripe" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = "0.7" +tokio = { version = "1", features = ["full"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +uuid = { version = "1", features = ["v4"] } +chrono = "0.4" diff --git a/examples/polyglot-coordination/services/rust-stripe/src/main.rs b/examples/polyglot-coordination/services/rust-stripe/src/main.rs new file mode 100644 index 0000000..c08eace --- /dev/null +++ b/examples/polyglot-coordination/services/rust-stripe/src/main.rs @@ -0,0 +1,179 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + routing::{get, post}, + Json, Router, +}; +use serde::{Deserialize, Serialize}; +use std::{ + collections::HashMap, + net::SocketAddr, + sync::Arc, +}; +use tokio::sync::RwLock; + +#[derive(Clone, Serialize)] +struct Customer { + id: String, + email: String, + name: String, + created: i64, +} + +#[derive(Clone, Serialize)] +struct Subscription { + id: String, + customer: String, + plan: String, + status: String, + created: i64, +} + +#[derive(Clone, Serialize)] +struct Charge { + id: String, + customer: String, + amount: i64, + currency: String, + status: String, +} + +#[derive(Deserialize)] +struct CreateCustomerRequest { + email: String, + name: String, +} + +#[derive(Deserialize)] +struct CreateSubscriptionRequest { + customer: String, + plan: String, +} + +#[derive(Deserialize)] +struct CreateChargeRequest { + customer: String, + amount: i64, + #[serde(default = "default_currency")] + currency: String, +} + +fn default_currency() -> String { + "usd".to_string() +} + +type AppState = Arc>; + +#[derive(Default)] +struct Store { + customers: HashMap, + subscriptions: HashMap, + charges: HashMap, +} + +async fn create_customer( + State(state): State, + Json(req): Json, +) -> (StatusCode, Json) { + let customer = Customer { + id: format!("cus_{}", uuid::Uuid::new_v4().simple()), + email: req.email, + name: req.name, + created: chrono::Utc::now().timestamp(), + }; + + let mut store = state.write().await; + store.customers.insert(customer.id.clone(), customer.clone()); + + (StatusCode::CREATED, Json(customer)) +} + +async fn get_customer( + State(state): State, + Path(id): Path, +) -> Result, StatusCode> { + let store = state.read().await; + store + .customers + .get(&id) + .cloned() + .map(Json) + .ok_or(StatusCode::NOT_FOUND) +} + +async fn create_subscription( + State(state): State, + Json(req): Json, +) -> Result<(StatusCode, Json), StatusCode> { + let mut store = state.write().await; + if !store.customers.contains_key(&req.customer) { + return Err(StatusCode::NOT_FOUND); + } + + let subscription = Subscription { + id: format!("sub_{}", uuid::Uuid::new_v4().simple()), + customer: req.customer, + plan: req.plan, + status: "active".to_string(), + created: chrono::Utc::now().timestamp(), + }; + + store.subscriptions.insert(subscription.id.clone(), subscription.clone()); + + Ok((StatusCode::CREATED, Json(subscription))) +} + +async fn create_charge( + State(state): State, + Json(req): Json, +) -> Result<(StatusCode, Json), StatusCode> { + let mut store = state.write().await; + if !store.customers.contains_key(&req.customer) { + return Err(StatusCode::NOT_FOUND); + } + + let charge = Charge { + id: format!("ch_{}", uuid::Uuid::new_v4().simple()), + customer: req.customer, + amount: req.amount, + currency: req.currency, + status: "succeeded".to_string(), + }; + + store.charges.insert(charge.id.clone(), charge.clone()); + + Ok((StatusCode::CREATED, Json(charge))) +} + +async fn health() -> &'static str { + "OK" +} + +#[tokio::main] +async fn main() { + let state: AppState = Arc::new(RwLock::new(Store::default())); + + let app = Router::new() + .route("/health", get(health)) + .route("/v1/customers", post(create_customer)) + .route("/v1/customers/:id", get(get_customer)) + .route("/v1/subscriptions", post(create_subscription)) + .route("/v1/charges", post(create_charge)) + .with_state(state); + + let addr = SocketAddr::from(([127, 0, 0, 1], 4040)); + println!("[Rust Stripe] Fake Stripe API on http://{}", addr); + println!(" POST /v1/customers - Create customer"); + println!(" GET /v1/customers/:id - Get customer"); + println!(" POST /v1/subscriptions - Create subscription"); + println!(" POST /v1/charges - Create charge"); + + let listener = tokio::net::TcpListener::bind(addr) + .await + .expect("Failed to bind to port 4040 - is it already in use?"); + + if let Err(e) = axum::serve(listener, app).await { + eprintln!("Server error: {}", e); + std::process::exit(1); + } +} diff --git a/examples/polyglot-coordination/tsconfig.json b/examples/polyglot-coordination/tsconfig.json new file mode 100644 index 0000000..d66c787 --- /dev/null +++ b/examples/polyglot-coordination/tsconfig.json @@ -0,0 +1,12 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "ESNext", + "moduleResolution": "bundler", + "esModuleInterop": true, + "strict": true, + "skipLibCheck": true, + "outDir": "dist" + }, + "include": ["**/*.ts"] +} diff --git a/examples/polyglot-coordination/workers/data-requester.ts b/examples/polyglot-coordination/workers/data-requester.ts new file mode 100644 index 0000000..20f4b60 --- /dev/null +++ b/examples/polyglot-coordination/workers/data-requester.ts @@ -0,0 +1,66 @@ +import { createBridge } from '../lib/bridge' +import { PythonIPC } from '../lib/python-ipc' +import { fileURLToPath } from 'url' +import { dirname, join } from 'path' +import type { AnalyticsScoreInput, AnalyticsResult, OnboardingMetrics, User } from '../lib/types' + +const __dirname = dirname(fileURLToPath(import.meta.url)) +const PYTHON_SCRIPT = join(__dirname, '../services/python-analytics/analytics.py') + +const bridge = createBridge('data-requester') +const python = new PythonIPC(PYTHON_SCRIPT) + +async function start() { + console.log('[Data Requester] Starting Python analytics subprocess...') + await python.start() + await python.waitReady() + console.log('[Data Requester] Python analytics ready (stdin/stdout IPC)') + + bridge.registerFunction( + { function_path: 'analytics.score' }, + async (input: AnalyticsScoreInput): Promise => { + return python.call('score', input) + } + ) + + bridge.registerFunction( + { function_path: 'analytics.riskProfile' }, + async (input: AnalyticsScoreInput): Promise => { + const result = await python.call('score', input) + return { + ...result, + factors: [ + ...result.factors, + result.riskScore > 70 ? 'high_risk' : result.riskScore > 40 ? 'medium_risk' : 'low_risk', + ], + } + } + ) + + bridge.registerFunction( + { function_path: 'analytics.onboardingMetrics' }, + async (input: { users: User[] }): Promise => { + return python.call('metrics', input) + } + ) + + console.log('[Data Requester] Python functions exposed via iii:') + console.log(' analytics.score - Risk scoring (via stdin/stdout)') + console.log(' analytics.riskProfile - Extended risk profile') + console.log(' analytics.onboardingMetrics - Aggregate metrics') +} + +process.on('SIGINT', () => { + python.stop() + process.exit(0) +}) + +process.on('SIGTERM', () => { + python.stop() + process.exit(0) +}) + +start().catch((err) => { + console.error('[Data Requester] Failed to start:', err) + process.exit(1) +}) diff --git a/examples/polyglot-coordination/workers/stripe-bridge.ts b/examples/polyglot-coordination/workers/stripe-bridge.ts new file mode 100644 index 0000000..bf4ea44 --- /dev/null +++ b/examples/polyglot-coordination/workers/stripe-bridge.ts @@ -0,0 +1,94 @@ +import { createBridge } from '../lib/bridge' +import type { + StripeCustomer, + StripeSubscription, + StripeCharge, + CreateCustomerInput, + CreateSubscriptionInput, + ChargeInput, +} from '../lib/types' + +const STRIPE_URL = process.env.STRIPE_URL ?? 'http://127.0.0.1:4040' + +const bridge = createBridge('stripe-bridge') + +const STRIPE_ID_PATTERN = /^(cus|sub|ch)_[a-f0-9]{32}$/ + +function validateStripeId(id: string, prefix: string): void { + if (!id || typeof id !== 'string') { + throw new Error(`Invalid ${prefix} ID: must be a non-empty string`) + } + if (id.length > 50) { + throw new Error(`Invalid ${prefix} ID: too long`) + } + if (!id.startsWith(`${prefix}_`)) { + throw new Error(`Invalid ${prefix} ID: must start with "${prefix}_"`) + } + if (!STRIPE_ID_PATTERN.test(id)) { + throw new Error(`Invalid ${prefix} ID format`) + } +} + +async function stripeRequest( + method: string, + path: string, + body?: unknown +): Promise { + const response = await fetch(`${STRIPE_URL}${path}`, { + method, + headers: { 'Content-Type': 'application/json' }, + body: body ? JSON.stringify(body) : undefined, + }) + + if (!response.ok) { + const errorBody = await response.text().catch(() => '') + throw new Error(`Stripe API error: ${response.status} ${response.statusText}${errorBody ? ` - ${errorBody}` : ''}`) + } + + return response.json() +} + +bridge.registerFunction( + { function_path: 'stripe.createCustomer' }, + async (input: CreateCustomerInput): Promise => { + return stripeRequest('POST', '/v1/customers', input) + } +) + +bridge.registerFunction( + { function_path: 'stripe.getCustomer' }, + async (input: { id: string }): Promise => { + validateStripeId(input.id, 'cus') + return stripeRequest('GET', `/v1/customers/${encodeURIComponent(input.id)}`) + } +) + +bridge.registerFunction( + { function_path: 'stripe.createSubscription' }, + async (input: CreateSubscriptionInput): Promise => { + return stripeRequest('POST', '/v1/subscriptions', { + customer: input.customerId, + plan: input.plan, + }) + } +) + +bridge.registerFunction( + { function_path: 'stripe.charge' }, + async (input: ChargeInput): Promise => { + return stripeRequest('POST', '/v1/charges', { + customer: input.customerId, + amount: input.amount, + currency: input.currency ?? 'usd', + }) + } +) + +process.on('SIGINT', () => process.exit(0)) +process.on('SIGTERM', () => process.exit(0)) + +console.log('[Stripe Bridge] Wrapping Rust HTTP server as iii functions') +console.log(' stripe.createCustomer - POST /v1/customers') +console.log(' stripe.getCustomer - GET /v1/customers/:id') +console.log(' stripe.createSubscription - POST /v1/subscriptions') +console.log(' stripe.charge - POST /v1/charges') diff --git a/examples/polyglot-coordination/workers/user-service.ts b/examples/polyglot-coordination/workers/user-service.ts new file mode 100644 index 0000000..751151c --- /dev/null +++ b/examples/polyglot-coordination/workers/user-service.ts @@ -0,0 +1,136 @@ +import { createBridge } from '../lib/bridge' +import type { + User, + CreateUserInput, + GetByIdInput, + CreateSubscriptionInput, + StripeCustomer, + StripeSubscription, +} from '../lib/types' + +const bridge = createBridge('user-service') + +const users: Map = new Map() + +const EMAIL_REGEX = /^[^\s@]+@[^\s@]+\.[^\s@]+$/ +const MAX_NAME_LENGTH = 200 +const MAX_EMAIL_LENGTH = 254 + +function generateId(): string { + return `usr_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}` +} + +function validateCreateInput(input: CreateUserInput): string | null { + if (!input.email || typeof input.email !== 'string') { + return 'email is required' + } + if (input.email.length > MAX_EMAIL_LENGTH) { + return `email exceeds maximum length of ${MAX_EMAIL_LENGTH}` + } + if (!EMAIL_REGEX.test(input.email)) { + return 'invalid email format' + } + if (!input.name || typeof input.name !== 'string') { + return 'name is required' + } + if (input.name.trim().length === 0) { + return 'name cannot be empty' + } + if (input.name.length > MAX_NAME_LENGTH) { + return `name exceeds maximum length of ${MAX_NAME_LENGTH}` + } + return null +} + +bridge.registerFunction( + { function_path: 'users.create' }, + async (input: CreateUserInput): Promise => { + const validationError = validateCreateInput(input) + if (validationError) { + return { error: validationError } + } + + const user: User = { + id: generateId(), + email: input.email.toLowerCase().trim(), + name: input.name.trim(), + plan: input.plan ?? 'free', + createdAt: new Date().toISOString(), + } + users.set(user.id, user) + return user + } +) + +bridge.registerFunction( + { function_path: 'users.get' }, + async (input: GetByIdInput): Promise => { + return users.get(input.id) ?? null + } +) + +bridge.registerFunction( + { function_path: 'users.update' }, + async (input: Partial & { id: string }): Promise => { + const existing = users.get(input.id) + if (!existing) return null + + const updated: User = { + ...existing, + ...input, + id: existing.id, + createdAt: existing.createdAt, + } + users.set(updated.id, updated) + return updated + } +) + +bridge.registerFunction( + { function_path: 'users.list' }, + async (): Promise => { + return Array.from(users.values()) + } +) + +bridge.registerFunction( + { function_path: 'billing.createSubscription' }, + async (input: { userId: string; plan: string }): Promise<{ + user: User + stripeCustomer: StripeCustomer + subscription: StripeSubscription + } | null> => { + const user = users.get(input.userId) + if (!user) return null + + const stripeCustomer = await bridge.invokeFunction('stripe.createCustomer', { + email: user.email, + name: user.name, + }) as StripeCustomer + + const subscription = await bridge.invokeFunction('stripe.createSubscription', { + customerId: stripeCustomer.id, + plan: input.plan, + } satisfies CreateSubscriptionInput) as StripeSubscription + + const updatedUser: User = { + ...user, + stripeCustomerId: stripeCustomer.id, + subscriptionId: subscription.id, + plan: input.plan, + } + users.set(user.id, updatedUser) + + return { user: updatedUser, stripeCustomer, subscription } + } +) + +process.on('SIGINT', () => process.exit(0)) +process.on('SIGTERM', () => process.exit(0)) + +console.log('[User Service] User management functions registered') +console.log(' users.create - Create new user') +console.log(' users.get - Get user by ID') +console.log(' users.update - Update user') +console.log(' users.list - List all users') +console.log(' billing.createSubscription - Create Stripe subscription (calls stripe.*)') diff --git a/examples/polyglot-coordination/workflow/onboarding.ts b/examples/polyglot-coordination/workflow/onboarding.ts new file mode 100644 index 0000000..44d73f5 --- /dev/null +++ b/examples/polyglot-coordination/workflow/onboarding.ts @@ -0,0 +1,162 @@ +import { getContext, currentTraceId } from '@iii-dev/sdk' +import type { Context } from '@iii-dev/sdk' +import { createBridge } from '../lib/bridge' +import type { + User, + StripeCustomer, + StripeSubscription, + AnalyticsResult, + OnboardingMetrics, + OnboardUserInput, + OnboardingResult, +} from '../lib/types' + +const bridge = createBridge('onboarding-workflow') + +async function invoke(ctx: Context, fn: string, input: unknown): Promise { + ctx.logger.info(`Calling ${fn}`, { input }) + const result = (await bridge.invokeFunction(fn, input)) as T + ctx.logger.info(`${fn} returned`, { result }) + return result +} + +bridge.registerFunction( + { function_path: 'workflow.onboardUser' }, + async (req: { body?: OnboardUserInput } & OnboardUserInput) => { + const input: OnboardUserInput = req.body || req + const ctx = getContext() + const traceId = currentTraceId() + + ctx.logger.info('Starting user onboarding workflow', { input, traceId }) + + if (!input.email?.trim()) { + return { status_code: 400, body: { error: 'email is required' } } + } + if (!input.name?.trim()) { + return { status_code: 400, body: { error: 'name is required' } } + } + if (!input.plan?.trim()) { + return { status_code: 400, body: { error: 'plan is required' } } + } + + try { + ctx.logger.info('Step 1: Creating user in Node.js User Service') + const user = await invoke(ctx, 'users.create', { + email: input.email, + name: input.name, + plan: input.plan, + }) + + ctx.logger.info('Step 2: Creating Stripe customer via Rust HTTP server') + const stripeCustomer = await invoke(ctx, 'stripe.createCustomer', { + email: user.email, + name: user.name, + }) + + ctx.logger.info('Step 3: Creating subscription in Rust Stripe') + const subscription = await invoke(ctx, 'stripe.createSubscription', { + customerId: stripeCustomer.id, + plan: input.plan, + }) + + ctx.logger.info('Step 4: Running analytics via Python stdin/stdout IPC') + const analytics = await invoke(ctx, 'analytics.score', { + userId: user.id, + email: user.email, + name: user.name, + }) + + ctx.logger.info('Step 5: Updating user with Stripe and analytics data') + const updatedUser = await invoke(ctx, 'users.update', { + id: user.id, + stripeCustomerId: stripeCustomer.id, + subscriptionId: subscription.id, + riskScore: analytics.riskScore, + }) + + if (!updatedUser) { + ctx.logger.error('Failed to update user', { userId: user.id }) + return { status_code: 500, body: { error: 'Failed to update user', traceId } } + } + + const result: OnboardingResult = { + user: updatedUser, + stripeCustomer, + subscription, + analytics, + } + + ctx.logger.info('Onboarding workflow complete', { userId: user.id, traceId }) + + return { + status_code: 201, + body: { + message: 'User onboarded successfully', + traceId, + ...result, + }, + } + } catch (error) { + const message = error instanceof Error ? error.message : 'Unknown error' + ctx.logger.error('Onboarding workflow failed', { error: message, traceId }) + return { status_code: 500, body: { error: message, traceId } } + } + } +) + +bridge.registerFunction( + { function_path: 'workflow.getOnboardingMetrics' }, + async () => { + const ctx = getContext() + const traceId = currentTraceId() + + ctx.logger.info('Fetching onboarding metrics', { traceId }) + + try { + const users = await invoke(ctx, 'users.list', {}) + + const metrics = await invoke(ctx, 'analytics.onboardingMetrics', { + users, + }) + + return { + status_code: 200, + body: { + traceId, + metrics, + userCount: users.length, + }, + } + } catch (error) { + const message = error instanceof Error ? error.message : 'Unknown error' + ctx.logger.error('Failed to get metrics', { error: message, traceId }) + return { status_code: 500, body: { error: message, traceId } } + } + } +) + +bridge.registerTrigger({ + trigger_type: 'api', + function_path: 'workflow.onboardUser', + config: { api_path: 'onboard', http_method: 'POST' }, +}) + +bridge.registerTrigger({ + trigger_type: 'api', + function_path: 'workflow.getOnboardingMetrics', + config: { api_path: 'onboard/metrics', http_method: 'GET' }, +}) + +process.on('SIGINT', () => process.exit(0)) +process.on('SIGTERM', () => process.exit(0)) + +console.log('[Onboarding Workflow] Polyglot coordination ready') +console.log(' POST /onboard - Full onboarding flow (Node.js → Rust → Python)') +console.log(' GET /onboard/metrics - Aggregate metrics from Python analytics') +console.log('') +console.log('Architecture:') +console.log(' • Node.js User Service (users.* functions)') +console.log(' • Rust Fake Stripe HTTP server (stripe.* via bridge)') +console.log(' • Python Analytics via stdin/stdout IPC (analytics.* via bridge)') +console.log('') +console.log('All function calls look identical - the caller never knows the transport!')