diff --git a/.gitignore b/.gitignore index b4ef677..0adc64f 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,4 @@ target/ .claude/ .idea/ .DS_Store -docs \ No newline at end of file +docsmcp/Cargo.lock diff --git a/README.md b/README.md index 98aa919..5ca15e3 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,164 @@ -# Motia Workers +# III Workers Worker modules for the [III engine](https://github.com/iii-hq/iii). ## Modules +### mcp + +MCP protocol worker — exposes iii-engine functions as MCP tools via stdio and Streamable HTTP. + +**Protocol version:** `2025-11-25` + +**Features:** +- Dual transport: stdio (Claude Desktop, Cursor) + HTTP (`POST /mcp`) +- 6 built-in tools: worker register/stop, trigger register/unregister/void/enqueue +- 4 MCP resources: `iii://functions`, `iii://workers`, `iii://triggers`, `iii://context` +- 4 MCP prompts: register-function, build-api, setup-cron, event-pipeline +- Metadata filtering: only functions with `mcp.expose: true` are exposed (unless `--expose-all`) +- Spawn Node.js/Python workers on the fly via `iii_worker_register` tool + +#### Build + +```bash +cd mcp +cargo build --release +``` + +#### Usage + +```bash +iii-mcp # MCP stdio (Claude Desktop, Cursor) +iii-mcp --no-stdio # MCP HTTP only (POST /mcp) +iii-mcp --expose-all # show all functions, ignore metadata filter +``` + +Tag functions for MCP exposure: +```js +iii.registerFunction({ + id: 'orders::process', + metadata: { "mcp.expose": true } +}, handler) +``` + +#### Testing with MCP Inspector + +Use [MCP Inspector](https://github.com/modelcontextprotocol/inspector) to debug and validate the MCP worker interactively. + +**Setup:** + +Create `mcp-inspector-config.json`: +```json +{ + "mcpServers": { + "iii-mcp": { + "command": "./mcp/target/release/iii-mcp", + "args": [] + } + } +} +``` + +**Web UI (interactive):** +```bash +npx @modelcontextprotocol/inspector \ + --config mcp-inspector-config.json \ + --server iii-mcp +``` +Opens a browser at `http://localhost:6274` where you can list tools, call functions, read resources, and test prompts. + +**CLI (scriptable):** +```bash +# List tools +npx @modelcontextprotocol/inspector --cli \ + --config mcp-inspector-config.json \ + --server iii-mcp \ + --method tools/list + +# Call a tool +npx @modelcontextprotocol/inspector --cli \ + --config mcp-inspector-config.json \ + --server iii-mcp \ + --method tools/call \ + --tool-name demo__echo \ + --tool-arg 'message=hello' + +# List resources +npx @modelcontextprotocol/inspector --cli \ + --config mcp-inspector-config.json \ + --server iii-mcp \ + --method resources/list +``` + +**End-to-end test:** + +```bash +# Terminal 1: start engine +iii --use-default-config + +# Terminal 2: start MCP worker +iii-mcp --debug + +# Terminal 3: start a test worker with metadata +node -e " +import { registerWorker } from 'iii-sdk' +const iii = registerWorker('ws://localhost:49134') +iii.registerFunction({ + id: 'demo::echo', + description: 'Echo input', + metadata: { \"mcp.expose\": true }, + request_format: { type: 'object', properties: { message: { type: 'string' } }, required: ['message'] } +}, async (input) => ({ echoed: input.message })) +setInterval(() => {}, 10000) +" --input-type=module + +# Terminal 4: validate (wait ~6s for function discovery) +npx @modelcontextprotocol/inspector --cli \ + --config mcp-inspector-config.json \ + --server iii-mcp \ + --method tools/list +# Should show demo__echo alongside the 6 built-in tools +``` + +> Functions registered without `mcp.expose: true` metadata will not appear in `tools/list` unless `--expose-all` is set. The engine polls for new functions every 5 seconds, so allow a brief delay after registration. + +--- + +### a2a + +A2A protocol worker — exposes iii-engine functions as A2A agent skills via HTTP. + +**Features:** +- Full A2A type system: AgentCard, Task (8 states), Message, Part, Artifact +- Methods: `message/send`, `tasks/get`, `tasks/cancel`, `tasks/list` +- Agent card at `GET /.well-known/agent-card.json` +- Task state stored via engine KV (`a2a:tasks` scope) +- Metadata filtering: only functions with `a2a.expose: true` are exposed (unless `--expose-all`) + +#### Build + +```bash +cd a2a +cargo build --release +``` + +#### Usage + +```bash +iii-a2a # A2A HTTP (POST /a2a + GET /.well-known/agent-card.json) +iii-a2a --expose-all # show all functions as skills +``` + +Tag functions for A2A exposure: +```js +iii.registerFunction({ + id: 'orders::process', + metadata: { "a2a.expose": true } +}, handler) +``` + +--- + ### image-resize A Rust-based image resize worker that connects to the III engine via WebSocket and processes images through stream channels. diff --git a/a2a/Cargo.toml b/a2a/Cargo.toml new file mode 100644 index 0000000..623ba2e --- /dev/null +++ b/a2a/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "iii-a2a" +version = "0.3.0" +edition = "2024" +description = "A2A protocol worker for iii-engine" +license = "Apache-2.0" +authors = ["Rohit Ghumare "] +repository = "https://github.com/iii-hq/workers" +homepage = "https://github.com/iii-hq/workers" +rust-version = "1.85" +keywords = ["a2a", "agent-to-agent", "ai", "iii-engine"] +categories = ["command-line-utilities"] + +[[bin]] +name = "iii-a2a" +path = "src/main.rs" + +[dependencies] +iii-sdk = "0.10.0" +tokio = { version = "1", features = ["macros", "rt-multi-thread", "sync", "time", "signal"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +clap = { version = "4", features = ["derive"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +anyhow = "1" +uuid = { version = "1", features = ["v4"] } diff --git a/a2a/src/handler.rs b/a2a/src/handler.rs new file mode 100644 index 0000000..58352f2 --- /dev/null +++ b/a2a/src/handler.rs @@ -0,0 +1,544 @@ +use iii_sdk::{ + III, RegisterFunctionMessage, RegisterTriggerInput, TriggerAction, TriggerRequest, Value, +}; +use serde_json::json; + +use crate::types::*; + +fn has_metadata_flag(f: &iii_sdk::FunctionInfo, key: &str) -> bool { + f.metadata + .as_ref() + .and_then(|m| m.get(key)) + .and_then(|v| v.as_bool()) + .unwrap_or(false) +} + +async fn is_function_exposed(iii: &III, function_id: &str, expose_all: bool) -> bool { + if expose_all { + return true; + } + match iii.list_functions().await { + Ok(fns) => fns + .iter() + .any(|f| f.function_id == function_id && has_metadata_flag(f, "a2a.expose")), + Err(_) => false, + } +} + +pub fn register(iii: &III, expose_all: bool, base_url: String) { + let iii_card = iii.clone(); + let card_expose_all = expose_all; + let card_base_url = base_url.clone(); + iii.register_function_with( + RegisterFunctionMessage { + id: "a2a::agent_card".to_string(), + description: Some("A2A Agent Card".to_string()), + request_format: None, + response_format: None, + metadata: None, + invocation: None, + }, + move |_input: Value| { + let iii_inner = iii_card.clone(); + let base = card_base_url.clone(); + async move { + let card = build_agent_card(&iii_inner, card_expose_all, &base).await; + Ok(json!({ + "status_code": 200, + "headers": { "content-type": "application/json" }, + "body": card + })) + } + }, + ); + + let iii_rpc = iii.clone(); + let rpc_expose_all = expose_all; + iii.register_function_with( + RegisterFunctionMessage { + id: "a2a::jsonrpc".to_string(), + description: Some("A2A JSON-RPC endpoint".to_string()), + request_format: Some(json!({ + "type": "object", + "properties": { "body": { "type": "object" } } + })), + response_format: None, + metadata: None, + invocation: None, + }, + move |input: Value| { + let iii_inner = iii_rpc.clone(); + async move { + let body = if let Some(b) = input.get("body") { + b.clone() + } else { + input + }; + + let request: A2ARequest = match serde_json::from_value(body) { + Ok(r) => r, + Err(e) => { + return Ok(json!({ + "status_code": 200, + "headers": { "content-type": "application/json" }, + "body": A2AResponse::error(None, -32600, format!("Invalid request: {}", e)) + })); + } + }; + + let response = handle_a2a_request(&iii_inner, request, rpc_expose_all).await; + + Ok(json!({ + "status_code": 200, + "headers": { "content-type": "application/json" }, + "body": response + })) + } + }, + ); + + if let Err(e) = iii.register_trigger(RegisterTriggerInput { + trigger_type: "http".to_string(), + function_id: "a2a::agent_card".to_string(), + config: json!({ "api_path": "/.well-known/agent-card.json", "http_method": "GET" }), + }) { + tracing::error!(error = %e, "Failed to register a2a::agent_card trigger"); + } + + if let Err(e) = iii.register_trigger(RegisterTriggerInput { + trigger_type: "http".to_string(), + function_id: "a2a::jsonrpc".to_string(), + config: json!({ "api_path": "/a2a", "http_method": "POST" }), + }) { + tracing::error!(error = %e, "Failed to register a2a::jsonrpc trigger"); + } + + tracing::info!("A2A registered: GET /.well-known/agent-card.json, POST /a2a"); +} + +async fn build_agent_card(iii: &III, expose_all: bool, base_url: &str) -> AgentCard { + let skills = match iii.list_functions().await { + Ok(fns) => fns + .iter() + .filter(|f| expose_all || has_metadata_flag(f, "a2a.expose")) + .map(|f| AgentSkill { + id: f.function_id.clone(), + name: f + .description + .clone() + .unwrap_or_else(|| f.function_id.replace("::", " ")), + description: f + .description + .clone() + .unwrap_or_else(|| f.function_id.clone()), + tags: f.function_id.split("::").map(|s| s.to_string()).collect(), + examples: None, + }) + .collect(), + Err(_) => vec![], + }; + + AgentCard { + name: "iii-engine".to_string(), + description: "iii-engine agent — invoke any registered function via A2A".to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + supported_interfaces: vec![AgentInterface { + url: base_url.to_string(), + protocol_binding: "JSONRPC".to_string(), + protocol_version: "0.3".to_string(), + }], + provider: Some(AgentProvider { + organization: "iii".to_string(), + url: "https://github.com/iii-hq/iii".to_string(), + }), + documentation_url: Some("https://github.com/iii-hq/iii-connect".to_string()), + capabilities: AgentCapabilities { + streaming: false, + push_notifications: false, + state_transition_history: true, + }, + default_input_modes: vec!["text/plain".to_string(), "application/json".to_string()], + default_output_modes: vec!["text/plain".to_string(), "application/json".to_string()], + skills, + } +} + +async fn handle_a2a_request(iii: &III, request: A2ARequest, expose_all: bool) -> A2AResponse { + let id = request.id.clone(); + match request.method.as_str() { + "message/send" | "SendMessage" => handle_send(iii, id, request.params, expose_all).await, + "tasks/get" | "GetTask" => handle_get(iii, id, request.params).await, + "tasks/cancel" | "CancelTask" => handle_cancel(iii, id, request.params).await, + "tasks/list" | "ListTasks" => handle_list(iii, id).await, + "message/stream" | "SendStreamingMessage" | "tasks/resubscribe" | "SubscribeToTask" => { + A2AResponse::error(id, -32004, "Streaming not supported") + } + m if m.contains("pushNotification") || m.contains("PushNotification") => { + A2AResponse::error(id, -32003, "Push notifications not supported") + } + _ => A2AResponse::error(id, -32601, format!("Unknown method: {}", request.method)), + } +} + +const TASK_SCOPE: &str = "a2a:tasks"; + +async fn store_task(iii: &III, task: &Task) { + if let Err(e) = iii + .trigger(TriggerRequest { + function_id: "state::set".to_string(), + payload: json!({ "scope": TASK_SCOPE, "key": task.id, "data": task }), + action: Some(TriggerAction::Void), + timeout_ms: None, + }) + .await + { + tracing::error!(task_id = %task.id, error = %e, "Failed to store task"); + } +} + +async fn load_task(iii: &III, task_id: &str) -> Option { + iii.trigger(TriggerRequest { + function_id: "state::get".to_string(), + payload: json!({ "scope": TASK_SCOPE, "key": task_id }), + action: None, + timeout_ms: Some(5000), + }) + .await + .ok() + .and_then(|v| serde_json::from_value(v).ok()) +} + +fn msg_id() -> String { + uuid::Uuid::new_v4().to_string() +} + +fn text_part(s: impl Into) -> Part { + Part { + text: Some(s.into()), + data: None, + url: None, + raw: None, + media_type: None, + } +} + +fn iso_now() -> String { + let d = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default(); + let secs = d.as_secs(); + let millis = d.subsec_millis(); + let days = secs / 86400; + let time_secs = secs % 86400; + let h = time_secs / 3600; + let m = (time_secs % 3600) / 60; + let s = time_secs % 60; + + let mut y = 1970i64; + let mut remaining = days as i64; + loop { + let year_days = if y % 4 == 0 && (y % 100 != 0 || y % 400 == 0) { + 366 + } else { + 365 + }; + if remaining < year_days { + break; + } + remaining -= year_days; + y += 1; + } + let leap = y % 4 == 0 && (y % 100 != 0 || y % 400 == 0); + let month_days = [ + 31, + if leap { 29 } else { 28 }, + 31, + 30, + 31, + 30, + 31, + 31, + 30, + 31, + 30, + 31, + ]; + let mut mo = 0; + for (i, &md) in month_days.iter().enumerate() { + if remaining < md as i64 { + mo = i + 1; + break; + } + remaining -= md as i64; + } + let day = remaining + 1; + + format!( + "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z", + y, mo, day, h, m, s, millis + ) +} + +async fn handle_send( + iii: &III, + id: Option, + params: Option, + expose_all: bool, +) -> A2AResponse { + let params: SendMessageParams = match params { + Some(p) => match serde_json::from_value(p) { + Ok(p) => p, + Err(e) => return A2AResponse::error(id, -32602, format!("Invalid params: {}", e)), + }, + None => return A2AResponse::error(id, -32602, "Missing params"), + }; + + let task_id = params + .message + .task_id + .clone() + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + let context_id = params.message.context_id.clone(); + + let mut task = if let Some(existing) = load_task(iii, &task_id).await { + if matches!( + existing.status.state, + TaskState::Completed | TaskState::Canceled | TaskState::Failed | TaskState::Rejected + ) { + return A2AResponse::success(id, json!({ "task": existing })); + } + let mut t = existing; + if let Some(ref mut history) = t.history { + history.push(params.message.clone()); + } + t.status = TaskStatus { + state: TaskState::Working, + message: Some(Message { + message_id: msg_id(), + role: MessageRole::Agent, + parts: vec![text_part("Processing...")], + task_id: None, + context_id: None, + metadata: None, + }), + timestamp: Some(iso_now()), + }; + t + } else { + Task { + id: task_id.clone(), + context_id, + status: TaskStatus { + state: TaskState::Working, + message: Some(Message { + message_id: msg_id(), + role: MessageRole::Agent, + parts: vec![text_part("Processing...")], + task_id: None, + context_id: None, + metadata: None, + }), + timestamp: Some(iso_now()), + }, + artifacts: None, + history: Some(vec![params.message.clone()]), + metadata: params.metadata, + } + }; + store_task(iii, &task).await; + + let (function_id, payload) = resolve_function(¶ms.message); + if function_id.is_empty() { + task.status = TaskStatus { + state: TaskState::Failed, + message: Some(Message { + message_id: msg_id(), + role: MessageRole::Agent, + parts: vec![text_part( + "No function_id found. Send a data part with {\"function_id\": \"...\", \"payload\": {...}} or use :: notation in text.", + )], + task_id: None, + context_id: None, + metadata: None, + }), + timestamp: Some(iso_now()), + }; + store_task(iii, &task).await; + return A2AResponse::success(id, json!({ "task": task })); + } + let fn_name = function_id.clone(); + + if !is_function_exposed(iii, &function_id, expose_all).await { + task.status = TaskStatus { + state: TaskState::Failed, + message: Some(Message { + message_id: msg_id(), + role: MessageRole::Agent, + parts: vec![text_part(format!( + "Function '{}' is not exposed via a2a.expose metadata", + function_id + ))], + task_id: None, + context_id: None, + metadata: None, + }), + timestamp: Some(iso_now()), + }; + store_task(iii, &task).await; + return A2AResponse::success(id, json!({ "task": task })); + } + + match iii + .trigger(TriggerRequest { + function_id, + payload, + action: None, + timeout_ms: Some(30000), + }) + .await + { + Ok(result) => { + let fresh = load_task(iii, &task_id).await; + if let Some(ref t) = fresh { + if matches!(t.status.state, TaskState::Canceled) { + return A2AResponse::success(id, json!({ "task": t })); + } + } + let result_text = + serde_json::to_string_pretty(&result).unwrap_or_else(|_| result.to_string()); + task.status = TaskStatus { + state: TaskState::Completed, + message: None, + timestamp: Some(iso_now()), + }; + task.artifacts = Some(vec![Artifact { + artifact_id: uuid::Uuid::new_v4().to_string(), + parts: vec![Part { + text: Some(result_text), + data: None, + url: None, + raw: None, + media_type: Some("application/json".to_string()), + }], + name: Some(fn_name), + metadata: None, + }]); + } + Err(err) => { + task.status = TaskStatus { + state: TaskState::Failed, + message: Some(Message { + message_id: msg_id(), + role: MessageRole::Agent, + parts: vec![text_part(format!("Error: {}", err))], + task_id: None, + context_id: None, + metadata: None, + }), + timestamp: Some(iso_now()), + }; + } + } + + store_task(iii, &task).await; + A2AResponse::success(id, json!({ "task": task })) +} + +async fn handle_get(iii: &III, id: Option, params: Option) -> A2AResponse { + let params: GetTaskParams = match params { + Some(p) => match serde_json::from_value(p) { + Ok(p) => p, + Err(e) => return A2AResponse::error(id, -32602, format!("Invalid params: {}", e)), + }, + None => return A2AResponse::error(id, -32602, "Missing params"), + }; + match load_task(iii, ¶ms.id).await { + Some(task) => A2AResponse::success(id, json!({ "task": task })), + None => A2AResponse::error(id, -32001, format!("Task not found: {}", params.id)), + } +} + +async fn handle_list(iii: &III, id: Option) -> A2AResponse { + match iii + .trigger(TriggerRequest { + function_id: "state::list".to_string(), + payload: json!({ "scope": TASK_SCOPE }), + action: None, + timeout_ms: Some(5000), + }) + .await + { + Ok(value) => { + let tasks: Vec = value + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|v| serde_json::from_value(v.clone()).ok()) + .collect() + }) + .unwrap_or_default(); + A2AResponse::success(id, json!({ "tasks": tasks })) + } + Err(_) => A2AResponse::success(id, json!({ "tasks": [] })), + } +} + +async fn handle_cancel(iii: &III, id: Option, params: Option) -> A2AResponse { + let params: CancelTaskParams = match params { + Some(p) => match serde_json::from_value(p) { + Ok(p) => p, + Err(e) => return A2AResponse::error(id, -32602, format!("Invalid params: {}", e)), + }, + None => return A2AResponse::error(id, -32602, "Missing params"), + }; + match load_task(iii, ¶ms.id).await { + Some(mut task) => { + if matches!( + task.status.state, + TaskState::Completed + | TaskState::Canceled + | TaskState::Failed + | TaskState::Rejected + ) { + return A2AResponse::error(id, -32002, "Task not cancelable (terminal state)"); + } + task.status = TaskStatus { + state: TaskState::Canceled, + message: None, + timestamp: Some(iso_now()), + }; + store_task(iii, &task).await; + A2AResponse::success(id, json!({ "task": task })) + } + None => A2AResponse::error(id, -32001, format!("Task not found: {}", params.id)), + } +} + +fn resolve_function(message: &Message) -> (String, Value) { + let text = message + .parts + .iter() + .find_map(|p| p.text.as_ref()) + .cloned() + .unwrap_or_default(); + let data_payload = message.parts.iter().find_map(|p| p.data.as_ref()); + + if let Some(payload) = data_payload { + if let Some(fid) = payload.get("function_id").and_then(|v| v.as_str()) { + let args = payload.get("payload").cloned().unwrap_or(json!({})); + return (fid.to_string(), args); + } + } + + let text = text.trim(); + if text.contains("::") { + let parts: Vec<&str> = text.splitn(2, char::is_whitespace).collect(); + if parts.len() == 2 { + let payload = serde_json::from_str(parts[1]).unwrap_or(json!({ "input": parts[1] })); + return (parts[0].to_string(), payload); + } + return (text.to_string(), json!({})); + } + + (String::new(), json!({})) +} diff --git a/a2a/src/main.rs b/a2a/src/main.rs new file mode 100644 index 0000000..d3a3c0a --- /dev/null +++ b/a2a/src/main.rs @@ -0,0 +1,58 @@ +mod handler; +mod types; + +use clap::Parser; +use iii_sdk::{InitOptions, register_worker}; +use tracing_subscriber::{EnvFilter, fmt, prelude::*}; + +#[derive(Parser, Debug)] +#[command(name = "iii-a2a")] +#[command(version)] +#[command(about = "A2A protocol worker for iii-engine")] +struct Args { + #[arg(long, short = 'e', default_value = "ws://localhost:49134")] + engine_url: String, + + #[arg(long, short = 'd')] + debug: bool, + + #[arg( + long, + help = "Expose all functions as skills (ignore a2a.expose metadata)" + )] + expose_all: bool, + + #[arg( + long, + default_value = "http://localhost:3111", + help = "Public base URL advertised in the agent card" + )] + base_url: String, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + + let filter = if args.debug { + EnvFilter::new("iii_a2a=debug,iii_sdk=debug") + } else { + EnvFilter::new("iii_a2a=info,iii_sdk=warn") + }; + + tracing_subscriber::registry() + .with(fmt::layer().with_writer(std::io::stderr)) + .with(filter) + .init(); + + tracing::info!(version = env!("CARGO_PKG_VERSION"), "Starting iii-a2a"); + + let iii = register_worker(&args.engine_url, InitOptions::default()); + + handler::register(&iii, args.expose_all, args.base_url); + + tracing::info!("A2A endpoints registered on engine port. Ctrl+C to stop."); + tokio::signal::ctrl_c().await?; + + Ok(()) +} diff --git a/a2a/src/types.rs b/a2a/src/types.rs new file mode 100644 index 0000000..74d050a --- /dev/null +++ b/a2a/src/types.rs @@ -0,0 +1,214 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct A2ARequest { + pub jsonrpc: String, + #[serde(default)] + pub id: Option, + pub method: String, + #[serde(default)] + pub params: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct A2AResponse { + pub jsonrpc: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub result: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct A2AError { + pub code: i32, + pub message: String, +} + +impl A2AResponse { + pub fn success(id: Option, result: Value) -> Self { + Self { + jsonrpc: "2.0".to_string(), + id, + result: Some(result), + error: None, + } + } + + pub fn error(id: Option, code: i32, message: impl Into) -> Self { + Self { + jsonrpc: "2.0".to_string(), + id, + result: None, + error: Some(A2AError { + code, + message: message.into(), + }), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AgentCard { + pub name: String, + pub description: String, + pub version: String, + pub supported_interfaces: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub provider: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub documentation_url: Option, + pub capabilities: AgentCapabilities, + pub default_input_modes: Vec, + pub default_output_modes: Vec, + pub skills: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AgentInterface { + pub url: String, + pub protocol_binding: String, + pub protocol_version: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AgentProvider { + pub organization: String, + pub url: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AgentCapabilities { + #[serde(default)] + pub streaming: bool, + #[serde(default)] + pub push_notifications: bool, + #[serde(default)] + pub state_transition_history: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AgentSkill { + pub id: String, + pub name: String, + pub description: String, + #[serde(default)] + pub tags: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub examples: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum TaskState { + #[serde(rename = "submitted")] + Submitted, + #[serde(rename = "working")] + Working, + #[serde(rename = "input-required")] + InputRequired, + #[serde(rename = "auth-required")] + AuthRequired, + #[serde(rename = "completed")] + Completed, + #[serde(rename = "canceled")] + Canceled, + #[serde(rename = "failed")] + Failed, + #[serde(rename = "rejected")] + Rejected, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Task { + pub id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub context_id: Option, + pub status: TaskStatus, + #[serde(skip_serializing_if = "Option::is_none")] + pub artifacts: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub history: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TaskStatus { + pub state: TaskState, + #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub timestamp: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Message { + pub message_id: String, + pub role: MessageRole, + pub parts: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub task_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub context_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum MessageRole { + User, + Agent, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Part { + #[serde(skip_serializing_if = "Option::is_none")] + pub text: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub url: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub raw: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub media_type: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Artifact { + pub artifact_id: String, + pub parts: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SendMessageParams { + pub message: Message, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetTaskParams { + pub id: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CancelTaskParams { + pub id: String, +} diff --git a/image-resize/src/manifest.rs b/image-resize/src/manifest.rs index 8866885..5e1df87 100644 --- a/image-resize/src/manifest.rs +++ b/image-resize/src/manifest.rs @@ -41,7 +41,7 @@ mod tests { let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); assert!(parsed.is_object(), "Manifest must be valid JSON object"); assert_eq!(parsed["name"], "image-resize"); - assert_eq!(parsed["version"], "0.1.0"); + assert_eq!(parsed["version"], env!("CARGO_PKG_VERSION")); } #[test] diff --git a/mcp/Cargo.toml b/mcp/Cargo.toml new file mode 100644 index 0000000..934536f --- /dev/null +++ b/mcp/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "iii-mcp" +version = "0.3.0" +edition = "2024" +description = "MCP protocol worker for iii-engine" +license = "Apache-2.0" +authors = ["Rohit Ghumare "] +repository = "https://github.com/iii-hq/workers" +homepage = "https://github.com/iii-hq/workers" +rust-version = "1.85" +keywords = ["mcp", "model-context-protocol", "ai", "iii-engine"] +categories = ["command-line-utilities"] + +[[bin]] +name = "iii-mcp" +path = "src/main.rs" + +[dependencies] +iii-sdk = { git = "https://github.com/iii-hq/iii", branch = "main" } +tokio = { version = "1", features = ["macros", "rt-multi-thread", "io-std", "io-util", "sync", "time", "process", "fs", "signal"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +clap = { version = "4", features = ["derive"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +anyhow = "1" +uuid = { version = "1", features = ["v4"] } diff --git a/mcp/src/handler.rs b/mcp/src/handler.rs new file mode 100644 index 0000000..57925ba --- /dev/null +++ b/mcp/src/handler.rs @@ -0,0 +1,622 @@ +use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, Ordering}; + +use iii_sdk::{ + FunctionInfo, FunctionsAvailableGuard, III, RegisterFunctionMessage, RegisterTriggerInput, + Trigger, TriggerAction, TriggerRequest, Value, +}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use tokio::sync::{Mutex, mpsc}; + +use crate::prompts; +use crate::worker_manager::{WorkerCreateParams, WorkerManager, WorkerStopParams}; + +const MCP_PROTOCOL_VERSION: &str = "2025-11-25"; +const INTERNAL_ERROR: i32 = -32603; +const INVALID_PARAMS: i32 = -32602; +const METHOD_NOT_FOUND: i32 = -32601; + +fn has_metadata_flag(f: &FunctionInfo, key: &str) -> bool { + f.metadata + .as_ref() + .and_then(|m| m.get(key)) + .and_then(|v| v.as_bool()) + .unwrap_or(false) +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JsonRpcResponse { + pub jsonrpc: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub result: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JsonRpcError { + pub code: i32, + pub message: String, +} + +impl JsonRpcResponse { + pub fn success(id: Option, result: Value) -> Self { + Self { + jsonrpc: "2.0".to_string(), + id, + result: Some(result), + error: None, + } + } + + pub fn error(id: Option, code: i32, message: impl Into) -> Self { + Self { + jsonrpc: "2.0".to_string(), + id, + result: None, + error: Some(JsonRpcError { + code, + message: message.into(), + }), + } + } +} + +pub struct McpHandler { + initialized: AtomicBool, + iii: III, + expose_all: bool, + worker_manager: WorkerManager, + triggers: Mutex>, + notification_rx: tokio::sync::Mutex>, + _functions_guard: FunctionsAvailableGuard, +} + +impl McpHandler { + pub fn new(iii: III, engine_url: String, expose_all: bool) -> Self { + let (tx, notification_rx) = mpsc::channel(16); + + let guard = iii.on_functions_available(move |_| { + let n = json!({ "jsonrpc": "2.0", "method": "notifications/tools/list_changed" }); + if let Ok(json) = serde_json::to_string(&n) { + if tx.try_send(json).is_err() { + tracing::warn!("Notification channel full, tools/list_changed dropped"); + } + } + }); + + Self { + initialized: AtomicBool::new(false), + iii, + expose_all, + worker_manager: WorkerManager::new(engine_url), + triggers: Mutex::new(HashMap::new()), + notification_rx: tokio::sync::Mutex::new(notification_rx), + _functions_guard: guard, + } + } + + pub async fn take_notification(&self) -> Option { + self.notification_rx.lock().await.try_recv().ok() + } + + pub async fn handle(&self, body: Value) -> Option { + let method = body.get("method").and_then(|v| v.as_str()).unwrap_or(""); + let id = body.get("id").cloned(); + + if method.starts_with("notifications/") { + if method == "notifications/initialized" { + self.initialized.store(true, Ordering::SeqCst); + } + return None; + } + + if method == "initialize" { + self.initialized.store(true, Ordering::SeqCst); + } else if !self.initialized.load(Ordering::SeqCst) && method != "ping" { + return Some(json!(JsonRpcResponse::error( + id, + INTERNAL_ERROR, + "Not initialized" + ))); + } + + Some(self.dispatch(&body).await) + } + + async fn dispatch(&self, body: &Value) -> Value { + let method = body.get("method").and_then(|v| v.as_str()).unwrap_or(""); + let id = body.get("id").cloned(); + let params = body.get("params").cloned(); + + let result = match method { + "initialize" => Ok(initialize_result()), + "ping" => Ok(json!({})), + "tools/list" => self.tools_list().await.map_err(|e| (INTERNAL_ERROR, e)), + "tools/call" => self.tools_call(params).await.map_err(|e| (INVALID_PARAMS, e)), + "resources/list" => Ok(self.resources_list()), + "resources/read" => self.resources_read(params).await.map_err(|e| (INVALID_PARAMS, e)), + "resources/templates/list" => Ok(json!({ "resourceTemplates": [] })), + "prompts/list" => Ok(prompts::list()), + "prompts/get" => Ok(prompts::get(params)), + _ => Err((METHOD_NOT_FOUND, format!("Unknown method: {}", method))), + }; + + json!(match result { + Ok(value) => JsonRpcResponse::success(id, value), + Err((code, msg)) => JsonRpcResponse::error(id, code, msg), + }) + } + + async fn tools_list(&self) -> Result { + let mut tools = builtin_tools(); + if let Ok(functions) = self.iii.list_functions().await { + tools.extend( + functions + .iter() + .filter(|f| self.expose_all || has_metadata_flag(f, "mcp.expose")) + .map(function_to_tool), + ); + } + Ok(json!({ "tools": tools })) + } + + async fn tools_call(&self, params: Option) -> Result { + let params: CallParams = parse(params)?; + + match params.name.as_str() { + "iii_worker_register" => { + let p: WorkerCreateParams = parse(Some(params.arguments))?; + return match self.worker_manager.create_worker(p).await { + Ok(r) => Ok(tool_json(&serde_json::to_value(&r).unwrap_or_default())), + Err(e) => Ok(tool_error(&e)), + }; + } + "iii_worker_stop" => { + let p: WorkerStopParams = parse(Some(params.arguments))?; + return match self.worker_manager.stop_worker(p).await { + Ok(r) => Ok(tool_json(&serde_json::to_value(&r).unwrap_or_default())), + Err(e) => Ok(tool_error(&e)), + }; + } + "iii_trigger_register" => return Ok(self.trigger_register(params.arguments).await), + "iii_trigger_unregister" => return Ok(self.trigger_unregister(params.arguments).await), + "iii_trigger_void" => { + let fid = str_field(¶ms.arguments, "function_id"); + if fid.is_empty() { + return Ok(tool_error("Missing required field: function_id")); + } + let payload = params + .arguments + .get("payload") + .cloned() + .unwrap_or(json!({})); + return match self + .iii + .trigger(TriggerRequest { + function_id: fid.clone(), + payload, + action: Some(TriggerAction::Void), + timeout_ms: None, + }) + .await + { + Ok(_) => Ok(tool_result(&format!("Triggered (void): {}", fid))), + Err(e) => Ok(tool_error(&format!("Error: {}", e))), + }; + } + "iii_trigger_enqueue" => { + let fid = str_field(¶ms.arguments, "function_id"); + if fid.is_empty() { + return Ok(tool_error("Missing required field: function_id")); + } + let payload = params + .arguments + .get("payload") + .cloned() + .unwrap_or(json!({})); + let queue = str_field_or(¶ms.arguments, "queue", "default"); + return match self + .iii + .trigger(TriggerRequest { + function_id: fid, + payload, + action: Some(TriggerAction::Enqueue { queue }), + timeout_ms: None, + }) + .await + { + Ok(r) => Ok(tool_json(&r)), + Err(e) => Ok(tool_error(&format!("Error: {}", e))), + }; + } + _ => {} + } + + let function_id = params.name.replace("__", "::"); + if !self.expose_all { + if let Ok(fns) = self.iii.list_functions().await { + let exposed = fns.iter().any(|f| { + f.function_id == function_id && has_metadata_flag(f, "mcp.expose") + }); + if !exposed { + return Ok(tool_error(&format!( + "Function '{}' is not exposed via mcp.expose metadata", + function_id + ))); + } + } + } + match self + .iii + .trigger(TriggerRequest { + function_id: function_id.clone(), + payload: params.arguments, + action: None, + timeout_ms: None, + }) + .await + { + Ok(result) => Ok(tool_json(&result)), + Err(err) => { + tracing::error!(function_id = %function_id, error = %err, "Trigger failed"); + Ok(tool_error(&format!("Error: {}", err))) + } + } + } + + fn resources_list(&self) -> Value { + json!({ "resources": [ + { "uri": "iii://functions", "name": "Functions", "mimeType": "application/json" }, + { "uri": "iii://workers", "name": "Workers", "mimeType": "application/json" }, + { "uri": "iii://triggers", "name": "Triggers", "mimeType": "application/json" }, + { "uri": "iii://context", "name": "Context", "mimeType": "application/json" }, + ]}) + } + + async fn resources_read(&self, params: Option) -> Result { + #[derive(Deserialize)] + struct P { + uri: String, + } + let p: P = parse(params)?; + let (text, mime) = match p.uri.as_str() { + "iii://functions" => { + let v = self + .iii + .list_functions() + .await + .map_err(|e| format!("{}", e))?; + ( + serde_json::to_string_pretty(&v).unwrap_or_else(|_| "[]".into()), + "application/json", + ) + } + "iii://workers" => { + let v = self + .iii + .list_workers() + .await + .map_err(|e| format!("{}", e))?; + ( + serde_json::to_string_pretty(&v).unwrap_or_else(|_| "[]".into()), + "application/json", + ) + } + "iii://triggers" => { + let v = self + .iii + .list_triggers(true) + .await + .map_err(|e| format!("{}", e))?; + ( + serde_json::to_string_pretty(&v).unwrap_or_else(|_| "[]".into()), + "application/json", + ) + } + "iii://context" => ( + serde_json::to_string_pretty(&json!({ + "sdk_version": "0.10.0", + "function_id_delimiter": "::", + "metadata_filtering": { "mcp.expose": true, "a2a.expose": true } + })) + .unwrap(), + "application/json", + ), + _ => return Err(format!("Resource not found: {}", p.uri)), + }; + Ok(json!({ "contents": [{ "uri": p.uri, "mimeType": mime, "text": text }] })) + } + + async fn trigger_register(&self, args: Value) -> Value { + #[derive(Deserialize)] + struct P { + trigger_type: String, + function_id: String, + config: Value, + } + let p: P = match parse(Some(args)) { + Ok(p) => p, + Err(e) => return tool_error(&e), + }; + match self.iii.register_trigger(RegisterTriggerInput { + trigger_type: p.trigger_type.clone(), + function_id: p.function_id.clone(), + config: p.config, + metadata: None, + }) { + Ok(trigger) => { + let id = uuid::Uuid::new_v4().to_string(); + self.triggers.lock().await.insert(id.clone(), trigger); + tool_json( + &json!({ "id": id, "trigger_type": p.trigger_type, "function_id": p.function_id }), + ) + } + Err(e) => tool_error(&format!("Error: {}", e)), + } + } + + async fn trigger_unregister(&self, args: Value) -> Value { + #[derive(Deserialize)] + struct P { + id: String, + } + let p: P = match parse(Some(args)) { + Ok(p) => p, + Err(e) => return tool_error(&e), + }; + match self.triggers.lock().await.remove(&p.id) { + Some(trigger) => { + trigger.unregister(); + tool_json(&json!({ "id": p.id, "message": "Unregistered" })) + } + None => tool_error(&format!("Trigger not found: {}", p.id)), + } + } +} + +pub fn register_http(iii: &III, expose_all: bool) { + let iii_fn = iii.clone(); + iii.register_function_with( + RegisterFunctionMessage { + id: "mcp::handler".to_string(), + description: Some("MCP JSON-RPC handler".to_string()), + request_format: Some(json!({ "type": "object", "properties": { "body": { "type": "object" } } })), + response_format: None, metadata: None, invocation: None, + }, + move |input: Value| { + let iii_inner = iii_fn.clone(); + async move { + let body = input.get("body").cloned().unwrap_or(input); + let response = dispatch_http(&iii_inner, &body, expose_all).await; + Ok(json!({ "status_code": 200, "headers": { "content-type": "application/json" }, "body": response })) + } + }, + ); + + if let Err(e) = iii.register_trigger(RegisterTriggerInput { + trigger_type: "http".to_string(), + function_id: "mcp::handler".to_string(), + config: json!({ "api_path": "mcp", "http_method": "POST" }), + metadata: None, + }) { + tracing::error!(error = %e, "Failed to register MCP HTTP trigger"); + } else { + tracing::info!("MCP Streamable HTTP registered: POST /mcp"); + } +} + +fn initialize_result() -> Value { + json!({ + "protocolVersion": MCP_PROTOCOL_VERSION, + "capabilities": { "tools": { "listChanged": true }, "resources": { "subscribe": false, "listChanged": true }, "prompts": { "listChanged": false } }, + "serverInfo": { "name": "iii-mcp", "version": env!("CARGO_PKG_VERSION") }, + "instructions": "iii-engine MCP server (SDK v0.10). Functions with metadata mcp.expose: true are exposed as tools." + }) +} + +async fn dispatch_http(iii: &III, body: &Value, expose_all: bool) -> Value { + let method = body.get("method").and_then(|v| v.as_str()).unwrap_or(""); + let id = body.get("id").cloned(); + let params = body.get("params").cloned(); + + if method.starts_with("notifications/") { + return json!(null); + } + + let result = match method { + "initialize" => Ok(initialize_result()), + "ping" => Ok(json!({})), + "tools/list" => { + let mut tools = builtin_tools(); + if let Ok(fns) = iii.list_functions().await { + tools.extend( + fns.iter() + .filter(|f| expose_all || has_metadata_flag(f, "mcp.expose")) + .map(function_to_tool), + ); + } + Ok(json!({ "tools": tools })) + } + "tools/call" => { + let p: CallParams = match params { + Some(p) => match serde_json::from_value(p) { + Ok(p) => p, + Err(e) => { + return json!(JsonRpcResponse::error(id, INVALID_PARAMS, format!("{}", e))); + } + }, + None => return json!(JsonRpcResponse::error(id, INVALID_PARAMS, "Missing params")), + }; + match p.name.as_str() { + "iii_worker_register" | "iii_worker_stop" | "iii_trigger_register" | "iii_trigger_unregister" => { + Ok(tool_error("This tool requires stdio transport (worker/trigger management is not available over HTTP)")) + } + "iii_trigger_void" => { + let fid = str_field(&p.arguments, "function_id"); + if fid.is_empty() { + Ok(tool_error("Missing required field: function_id")) + } else { + let payload = p.arguments.get("payload").cloned().unwrap_or(json!({})); + match iii.trigger(TriggerRequest { + function_id: fid.clone(), payload, + action: Some(TriggerAction::Void), timeout_ms: None, + }).await { + Ok(_) => Ok(tool_result(&format!("Triggered (void): {}", fid))), + Err(e) => Ok(tool_error(&format!("Error: {}", e))), + } + } + } + "iii_trigger_enqueue" => { + let fid = str_field(&p.arguments, "function_id"); + if fid.is_empty() { + Ok(tool_error("Missing required field: function_id")) + } else { + let payload = p.arguments.get("payload").cloned().unwrap_or(json!({})); + let queue = str_field_or(&p.arguments, "queue", "default"); + match iii.trigger(TriggerRequest { + function_id: fid, payload, + action: Some(TriggerAction::Enqueue { queue }), timeout_ms: None, + }).await { + Ok(r) => Ok(tool_json(&r)), + Err(e) => Ok(tool_error(&format!("Error: {}", e))), + } + } + } + _ => { + let function_id = p.name.replace("__", "::"); + if !expose_all { + if let Ok(fns) = iii.list_functions().await { + let exposed = fns.iter().any(|f| { + f.function_id == function_id && has_metadata_flag(f, "mcp.expose") + }); + if !exposed { + return json!(JsonRpcResponse::success( + id, + tool_error(&format!( + "Function '{}' is not exposed via mcp.expose metadata", + function_id + )) + )); + } + } + } + match iii.trigger(TriggerRequest { + function_id, payload: p.arguments, + action: None, timeout_ms: None, + }).await { + Ok(r) => Ok(tool_json(&r)), + Err(e) => Ok(tool_error(&format!("Error: {}", e))), + } + } + } + } + "resources/list" => Ok(json!({ "resources": [ + { "uri": "iii://functions", "name": "Functions", "mimeType": "application/json" }, + { "uri": "iii://workers", "name": "Workers", "mimeType": "application/json" }, + { "uri": "iii://triggers", "name": "Triggers", "mimeType": "application/json" }, + ]})), + "prompts/list" => Ok(prompts::list()), + "prompts/get" => Ok(prompts::get(params)), + _ => Err((METHOD_NOT_FOUND, format!("Unknown method: {}", method))), + }; + + json!(match result { + Ok(v) => JsonRpcResponse::success(id, v), + Err((code, msg)) => JsonRpcResponse::error(id, code, msg), + }) +} + +fn parse(params: Option) -> Result { + match params { + Some(p) => serde_json::from_value(p).map_err(|e| format!("Invalid params: {}", e)), + None => Err("Missing params".to_string()), + } +} + +fn function_to_tool(f: &FunctionInfo) -> McpTool { + McpTool { + name: f.function_id.replace("::", "__"), + description: f.description.clone(), + input_schema: f + .request_format + .clone() + .unwrap_or_else(|| json!({ "type": "object", "properties": {} })), + } +} + +fn builtin_tools() -> Vec { + vec![ + McpTool { + name: "iii_worker_register".into(), + description: Some("Register a new worker (Node.js or Python)".into()), + input_schema: json!({ "type": "object", "properties": { "language": { "type": "string", "enum": ["node", "python"] }, "code": { "type": "string" }, "function_name": { "type": "string" }, "description": { "type": "string" } }, "required": ["language", "code", "function_name"] }), + }, + McpTool { + name: "iii_worker_stop".into(), + description: Some("Stop a worker".into()), + input_schema: json!({ "type": "object", "properties": { "id": { "type": "string" } }, "required": ["id"] }), + }, + McpTool { + name: "iii_trigger_register".into(), + description: Some("Attach an http/cron/queue trigger to a function".into()), + input_schema: json!({ "type": "object", "properties": { "trigger_type": { "type": "string" }, "function_id": { "type": "string" }, "config": { "type": "object" } }, "required": ["trigger_type", "function_id", "config"] }), + }, + McpTool { + name: "iii_trigger_unregister".into(), + description: Some("Unregister a trigger".into()), + input_schema: json!({ "type": "object", "properties": { "id": { "type": "string" } }, "required": ["id"] }), + }, + McpTool { + name: "iii_trigger_void".into(), + description: Some("Fire-and-forget function invocation".into()), + input_schema: json!({ "type": "object", "properties": { "function_id": { "type": "string" }, "payload": { "type": "object" } }, "required": ["function_id", "payload"] }), + }, + McpTool { + name: "iii_trigger_enqueue".into(), + description: Some("Enqueue to named queue".into()), + input_schema: json!({ "type": "object", "properties": { "function_id": { "type": "string" }, "payload": { "type": "object" }, "queue": { "type": "string" } }, "required": ["function_id", "payload"] }), + }, + ] +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct McpTool { + name: String, + #[serde(skip_serializing_if = "Option::is_none")] + description: Option, + input_schema: Value, +} + +#[derive(Deserialize)] +struct CallParams { + name: String, + #[serde(default)] + arguments: Value, +} + +fn tool_result(text: &str) -> Value { + json!({ "content": [{ "type": "text", "text": text }], "isError": false }) +} +fn tool_error(msg: &str) -> Value { + json!({ "content": [{ "type": "text", "text": msg }], "isError": true }) +} +fn tool_json(v: &Value) -> Value { + tool_result(&serde_json::to_string_pretty(v).unwrap_or_else(|_| v.to_string())) +} +fn str_field(v: &Value, key: &str) -> String { + v.get(key) + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string() +} +fn str_field_or(v: &Value, key: &str, default: &str) -> String { + v.get(key) + .and_then(|v| v.as_str()) + .unwrap_or(default) + .to_string() +} diff --git a/mcp/src/main.rs b/mcp/src/main.rs new file mode 100644 index 0000000..aa517c5 --- /dev/null +++ b/mcp/src/main.rs @@ -0,0 +1,67 @@ +mod handler; +mod prompts; +mod transport; +mod worker_manager; + +use std::sync::Arc; + +use clap::Parser; +use iii_sdk::{InitOptions, register_worker}; +use tracing_subscriber::{EnvFilter, fmt, prelude::*}; + +#[derive(Parser, Debug)] +#[command(name = "iii-mcp")] +#[command(version)] +#[command(about = "MCP protocol worker for iii-engine")] +struct Args { + #[arg(long, short = 'e', default_value = "ws://localhost:49134")] + engine_url: String, + + #[arg(long, short = 'd')] + debug: bool, + + #[arg(long, help = "Skip stdio, run as HTTP-only (POST /mcp on engine port)")] + no_stdio: bool, + + #[arg( + long, + help = "Expose all functions as tools (ignore mcp.expose metadata)" + )] + expose_all: bool, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + + let filter = if args.debug { + EnvFilter::new("iii_mcp=debug,iii_sdk=debug") + } else { + EnvFilter::new("iii_mcp=info,iii_sdk=warn") + }; + + tracing_subscriber::registry() + .with(fmt::layer().with_writer(std::io::stderr)) + .with(filter) + .init(); + + tracing::info!(version = env!("CARGO_PKG_VERSION"), "Starting iii-mcp"); + + let iii = register_worker(&args.engine_url, InitOptions::default()); + + handler::register_http(&iii, args.expose_all); + + if args.no_stdio { + tracing::info!("MCP HTTP-only mode. POST /mcp on engine port. Ctrl+C to stop."); + tokio::signal::ctrl_c().await?; + } else { + let h = Arc::new(handler::McpHandler::new( + iii, + args.engine_url, + args.expose_all, + )); + transport::run_stdio(h).await?; + } + + Ok(()) +} diff --git a/mcp/src/prompts.rs b/mcp/src/prompts.rs new file mode 100644 index 0000000..032b6df --- /dev/null +++ b/mcp/src/prompts.rs @@ -0,0 +1,82 @@ +use serde_json::{Value, json}; + +pub fn list() -> Value { + json!({ + "prompts": [ + { "name": "register-function", "description": "Guide to register a function", "arguments": [ + { "name": "language", "description": "node or python", "required": true }, + { "name": "function_id", "description": "e.g. myservice::process", "required": true } + ]}, + { "name": "build-api", "description": "Expose a function as HTTP endpoint", "arguments": [ + { "name": "method", "description": "GET, POST, PUT, DELETE", "required": true }, + { "name": "path", "description": "e.g. /users", "required": true } + ]}, + { "name": "setup-cron", "description": "Set up a scheduled cron job", "arguments": [ + { "name": "schedule", "description": "Cron expression", "required": true } + ]}, + { "name": "event-pipeline", "description": "Build an event-driven pipeline", "arguments": [] } + ] + }) +} + +fn require_arg<'a>(args: &'a Value, key: &str) -> Result<&'a str, Value> { + args.get(key) + .and_then(|v| v.as_str()) + .filter(|s| !s.is_empty()) + .ok_or_else(|| { + json!({ "messages": [{ "role": "user", "content": { "type": "text", "text": format!("Missing required argument: {key}") } }] }) + }) +} + +pub fn get(params: Option) -> Value { + let name = params + .as_ref() + .and_then(|p| p.get("name")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + let args = params + .as_ref() + .and_then(|p| p.get("arguments")) + .cloned() + .unwrap_or(json!({})); + + let text = match name { + "register-function" => { + let lang = match require_arg(&args, "language") { + Ok(v) => v, + Err(e) => return e, + }; + let fid = match require_arg(&args, "function_id") { + Ok(v) => v, + Err(e) => return e, + }; + if lang == "python" { + format!("Register Python function `{fid}`:\n1. `iii_worker_register` with language='python'\n2. Code: `async def handler(input): ...`\n3. Wire trigger via `iii_trigger_register`\n\n```python\nfrom iii_sdk import register_worker, Logger\niii = register_worker('ws://localhost:49134')\niii.register_function('{fid}', handler)\n```") + } else { + format!("Register Node.js function `{fid}`:\n1. `iii_worker_register` with language='node'\n2. Code: `async (input) => {{ ... }}`\n3. Wire trigger via `iii_trigger_register`\n\n```js\nimport {{ registerWorker, Logger }} from 'iii-sdk'\nconst iii = registerWorker('ws://localhost:49134')\niii.registerFunction({{ id: '{fid}' }}, handler)\n```") + } + } + "build-api" => { + let method = match require_arg(&args, "method") { + Ok(v) => v, + Err(e) => return e, + }; + let path = match require_arg(&args, "path") { + Ok(v) => v, + Err(e) => return e, + }; + format!("Expose HTTP {method} {path}:\n1. Register function\n2. Register trigger:\n```json\n{{ \"trigger_type\": \"http\", \"function_id\": \"api::handler\", \"config\": {{ \"api_path\": \"{path}\", \"http_method\": \"{method}\" }} }}\n```\n3. Input: {{ body, query_params, path_params, headers }}\n4. Return: {{ status_code, headers, body }}") + } + "setup-cron" => { + let schedule = match require_arg(&args, "schedule") { + Ok(v) => v, + Err(e) => return e, + }; + format!("Cron `{schedule}`:\n1. Register function\n2. Register trigger:\n```json\n{{ \"trigger_type\": \"cron\", \"function_id\": \"jobs::task\", \"config\": {{ \"expression\": \"{schedule}\" }} }}\n```\n3. `iii_trigger_unregister` to stop") + } + "event-pipeline" => "Event pipeline:\n```\nHTTP \u{2192} Fn A \u{2192} emit('order.created') \u{2192} Fn B \u{2192} emit('done') \u{2192} Fn C\n```\n1. Register functions per stage\n2. Wire HTTP trigger to entry\n3. Queue triggers: `{ \"trigger_type\": \"queue\", \"config\": { \"topic\": \"order.created\" } }`\n4. `iii_trigger_enqueue` for async, `iii_trigger_void` for fire-and-forget".to_string(), + _ => format!("Unknown prompt: {name}"), + }; + + json!({ "messages": [{ "role": "user", "content": { "type": "text", "text": text } }] }) +} diff --git a/mcp/src/transport.rs b/mcp/src/transport.rs new file mode 100644 index 0000000..e92bf91 --- /dev/null +++ b/mcp/src/transport.rs @@ -0,0 +1,73 @@ +use std::sync::Arc; + +use serde_json::Value; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; + +use crate::handler::{JsonRpcResponse, McpHandler}; + +pub async fn run_stdio(handler: Arc) -> anyhow::Result<()> { + let stdin = tokio::io::stdin(); + let stdout = tokio::io::stdout(); + let mut reader = BufReader::new(stdin); + let mut writer = BufWriter::new(stdout); + + tracing::info!("stdio transport started"); + + loop { + while let Some(notification) = handler.take_notification().await { + writer.write_all(notification.as_bytes()).await?; + writer.write_all(b"\n").await?; + writer.flush().await?; + } + + let mut line = String::new(); + match reader.read_line(&mut line).await { + Ok(0) => { + tracing::info!("stdin closed"); + break; + } + Ok(_) => { + let line = line.trim(); + if line.is_empty() { + continue; + } + + let body: Value = match serde_json::from_str(line) { + Ok(v) => v, + Err(err) => { + tracing::warn!(error = %err, "Parse error"); + let r = + JsonRpcResponse::error(None, -32700, format!("Parse error: {}", err)); + if let Ok(v) = serde_json::to_value(&r) { + write_json(&mut writer, &v).await?; + } + continue; + } + }; + + let Some(response) = handler.handle(body).await else { + continue; + }; + + write_json(&mut writer, &response).await?; + } + Err(err) => { + tracing::error!(error = %err, "stdin read error"); + break; + } + } + } + + Ok(()) +} + +async fn write_json( + writer: &mut BufWriter, + v: &Value, +) -> anyhow::Result<()> { + let json = serde_json::to_string(v)?; + writer.write_all(json.as_bytes()).await?; + writer.write_all(b"\n").await?; + writer.flush().await?; + Ok(()) +} diff --git a/mcp/src/worker_manager.rs b/mcp/src/worker_manager.rs new file mode 100644 index 0000000..b78c913 --- /dev/null +++ b/mcp/src/worker_manager.rs @@ -0,0 +1,272 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::PathBuf; +use std::process::Stdio; +use std::sync::Arc; +use tokio::process::{Child, Command}; +use tokio::sync::Mutex; +use uuid::Uuid; + +#[derive(Debug, Clone, Deserialize)] +pub struct WorkerCreateParams { + pub language: String, + pub code: String, + pub function_name: String, + pub description: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct WorkerStopParams { + pub id: String, +} + +#[derive(Debug, Clone, Serialize)] +pub struct SpawnedWorker { + pub id: String, + pub language: String, + pub function_name: String, + pub temp_dir: String, + pub pid: u32, +} + +#[derive(Debug, Clone, Serialize)] +pub struct WorkerCreateResult { + pub id: String, + pub function_name: String, + pub message: String, +} + +#[derive(Debug, Clone, Serialize)] +pub struct WorkerStopResult { + pub id: String, + pub message: String, +} + +fn js_string_literal(s: &str) -> String { + serde_json::to_string(s).unwrap_or_else(|_| "\"\"".to_string()) +} + +pub struct WorkerManager { + engine_url: String, + workers: Arc>>, +} + +impl WorkerManager { + pub fn new(engine_url: String) -> Self { + Self { + engine_url, + workers: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub async fn create_worker( + &self, + params: WorkerCreateParams, + ) -> Result { + let uuid = Uuid::new_v4().to_string(); + let worker_id = format!("worker-{}", &uuid[..8]); + + let temp_dir = std::env::temp_dir().join(format!("iii-{}", &worker_id)); + tokio::fs::create_dir_all(&temp_dir) + .await + .map_err(|e| format!("Failed to create temp dir: {}", e))?; + + let (file_name, code) = match params.language.as_str() { + "node" | "javascript" | "js" => { + let code = self.generate_node_worker(¶ms); + ("index.mjs", code) + } + "python" | "py" => { + let code = self.generate_python_worker(¶ms); + ("main.py", code) + } + _ => return Err(format!("Unsupported language: {}", params.language)), + }; + + let file_path = temp_dir.join(file_name); + tokio::fs::write(&file_path, &code) + .await + .map_err(|e| format!("Failed to write worker file: {}", e))?; + + let mut child = match self + .spawn_worker(¶ms.language, &temp_dir, file_name) + .await + { + Ok(c) => c, + Err(e) => { + let _ = tokio::fs::remove_dir_all(&temp_dir).await; + return Err(e); + } + }; + + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + match child.try_wait() { + Ok(Some(status)) => { + let _ = tokio::fs::remove_dir_all(&temp_dir).await; + return Err(format!( + "Worker exited immediately with status: {}", + status + )); + } + Err(e) => { + let _ = tokio::fs::remove_dir_all(&temp_dir).await; + return Err(format!("Failed to check worker status: {}", e)); + } + Ok(None) => {} + } + + let pid = child.id().unwrap_or(0); + + let spawned = SpawnedWorker { + id: worker_id.clone(), + language: params.language.clone(), + function_name: params.function_name.clone(), + temp_dir: temp_dir.to_string_lossy().to_string(), + pid, + }; + + self.workers + .lock() + .await + .insert(worker_id.clone(), (spawned, child)); + + tracing::info!( + worker_id = %worker_id, + function_name = %params.function_name, + pid = %pid, + "Spawned worker" + ); + + Ok(WorkerCreateResult { + id: worker_id, + function_name: params.function_name, + message: "Worker created and connecting to iii-engine".to_string(), + }) + } + + pub async fn stop_worker(&self, params: WorkerStopParams) -> Result { + let mut workers = self.workers.lock().await; + + if let Some((info, mut child)) = workers.remove(¶ms.id) { + if let Err(e) = child.kill().await { + tracing::warn!(worker_id = %params.id, error = %e, "Failed to kill worker process"); + } + + if let Err(e) = tokio::fs::remove_dir_all(&info.temp_dir).await { + tracing::warn!(worker_id = %params.id, error = %e, "Failed to remove temp dir"); + } + + tracing::info!(worker_id = %params.id, "Stopped worker"); + + Ok(WorkerStopResult { + id: params.id, + message: "Worker stopped and cleaned up".to_string(), + }) + } else { + Err(format!("Worker not found: {}", params.id)) + } + } + + fn generate_node_worker(&self, params: &WorkerCreateParams) -> String { + let engine_url = js_string_literal(&self.engine_url); + let function_name = js_string_literal(¶ms.function_name); + let description = js_string_literal( + params + .description + .as_deref() + .unwrap_or("Auto-generated function"), + ); + + format!( + r#"import {{ registerWorker, Logger }} from 'iii-sdk' + +const iii = registerWorker({engine_url}) +const logger = new Logger() + +const handler = {code} + +iii.registerFunction({{ id: {function_name}, description: {description}, metadata: {{ "mcp.expose": true }} }}, handler) + +logger.info('Function registered: ' + {function_name}) + +process.on('SIGTERM', () => {{ + logger.info('Worker shutting down') + process.exit(0) +}}) +process.on('SIGINT', () => {{ + logger.info('Worker interrupted') + process.exit(0) +}}) +"#, + engine_url = engine_url, + code = params.code, + function_name = function_name, + description = description, + ) + } + + fn generate_python_worker(&self, params: &WorkerCreateParams) -> String { + let engine_url = js_string_literal(&self.engine_url); + let function_name = js_string_literal(¶ms.function_name); + let description = js_string_literal( + params + .description + .as_deref() + .unwrap_or("Auto-generated function"), + ); + + format!( + r#"import asyncio +import signal +from iii_sdk import register_worker, Logger + +iii = register_worker({engine_url}) +logger = Logger() + +{code} + +iii.register_function({function_name}, handler, description={description}, metadata={{"mcp.expose": True}}) + +def shutdown(sig, frame): + logger.info('Worker shutting down') + exit(0) + +signal.signal(signal.SIGTERM, shutdown) +signal.signal(signal.SIGINT, shutdown) + +async def main(): + logger.info('Function registered: ' + {function_name}) + while True: + await asyncio.sleep(1) + +asyncio.run(main()) +"#, + engine_url = engine_url, + code = params.code, + function_name = function_name, + description = description, + ) + } + + async fn spawn_worker( + &self, + language: &str, + temp_dir: &PathBuf, + file_name: &str, + ) -> Result { + let cmd = match language { + "node" | "javascript" | "js" => "node", + "python" | "py" => "python3", + _ => return Err(format!("Unsupported language: {}", language)), + }; + + Command::new(cmd) + .arg(temp_dir.join(file_name)) + .current_dir(temp_dir) + .stdout(Stdio::null()) + .stderr(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .map_err(|e| format!("Failed to spawn {} process: {}", cmd, e)) + } +} diff --git a/registry/index.json b/registry/index.json index e152a66..206d0f7 100644 --- a/registry/index.json +++ b/registry/index.json @@ -20,6 +20,30 @@ } }, "version": "0.1.2" + }, + "mcp": { + "description": "MCP protocol worker — expose iii-engine functions as MCP tools (stdio + Streamable HTTP)", + "repo": "iii-hq/workers", + "tag_prefix": "mcp", + "supported_targets": ["aarch64-apple-darwin", "x86_64-unknown-linux-gnu"], + "has_checksum": true, + "default_config": { + "engine_url": "ws://localhost:49134", + "expose_all": false + }, + "version": "0.3.0" + }, + "a2a": { + "description": "A2A protocol worker — expose iii-engine functions as A2A skills (agent card + JSON-RPC)", + "repo": "iii-hq/workers", + "tag_prefix": "a2a", + "supported_targets": ["aarch64-apple-darwin", "x86_64-unknown-linux-gnu"], + "has_checksum": true, + "default_config": { + "engine_url": "ws://localhost:49134", + "expose_all": false + }, + "version": "0.3.0" } } }