diff --git a/src/agent-types.ts b/src/agent-types.ts index 116e37d..8a69215 100644 --- a/src/agent-types.ts +++ b/src/agent-types.ts @@ -26,21 +26,21 @@ export namespace N8nMessage { export interface JobSettings { type: 'n8n:jobsettings'; - jobId: Job['id']; + jobId: string; settings: unknown; } export interface RPCResponse { type: 'n8n:rpcresponse'; callId: string; - jobId: Job['id']; + jobId: string; status: 'success' | 'error'; data: unknown; } export interface JobDataResponse { type: 'n8n:jobdataresponse'; - jobId: Job['id']; + jobId: string; requestId: string; data: unknown; } @@ -58,25 +58,25 @@ export namespace N8nMessage { export namespace ToWorker { export interface JobReady { type: 'n8n:jobready'; - requestId: JobRequest['requestId']; - jobId: Job['id']; + requestId: string; + jobId: string; } export interface JobDone { type: 'n8n:jobdone'; - jobId: Job['id']; + jobId: string; data: INodeExecutionData[]; } export interface JobError { type: 'n8n:joberror'; - jobId: Job['id']; + jobId: string; error: unknown; } export interface JobDataRequest { type: 'n8n:jobdatarequest'; - jobId: Job['id']; + jobId: string; requestId: string; requestType: DataRequestType; param?: string; @@ -85,8 +85,8 @@ export namespace N8nMessage { export interface RPC { type: 'n8n:rpc'; callId: string; - jobId: Job['id']; - name: string; + jobId: string; + name: (typeof RPC_ALLOW_LIST)[number]; params: unknown[]; } @@ -98,25 +98,26 @@ export namespace WorkerMessage { export namespace ToN8n { export interface JobSettings { type: 'worker:jobsettings'; - jobId: Job['id']; + jobId: string; settings: unknown; } export interface JobCancel { type: 'worker:jobcancel'; - jobId: Job['id']; + jobId: string; reason: string; } export interface JobDataResponse { type: 'worker:jobdataresponse'; - jobId: Job['id']; + jobId: string; requestId: string; data: unknown; } export interface RPCResponse { type: 'worker:rpcresponse'; + jobId: string; callId: string; status: 'success' | 'error'; data: unknown; @@ -172,7 +173,7 @@ export namespace AgentMessage { export interface JobDataRequest { type: 'agent:jobdatarequest'; - jobId: Job['id']; + jobId: string; requestId: string; requestType: DataRequestType; param?: string; @@ -181,8 +182,8 @@ export namespace AgentMessage { export interface RPC { type: 'agent:rpc'; callId: string; - jobId: Job['id']; - name: string; + jobId: string; + name: (typeof RPC_ALLOW_LIST)[number]; params: unknown[]; } @@ -198,32 +199,27 @@ export namespace AgentMessage { } } -export interface Agent { - id: string; - name?: string; - jobTypes: string[]; - lastSeen: Date; -} - -export interface Job { - id: string; - agentId: Agent['id']; - workerId: string; - jobType: string; -} - -export interface JobOffer { - offerId: string; - agentId: Agent['id']; - jobType: string; - validFor: number; - validUntil: bigint; -} - -export interface JobRequest { - requestId: string; - workerId: string; - jobType: string; - - acceptInProgress?: boolean; -} +export const RPC_ALLOW_LIST = [ + 'helpers.httpRequestWithAuthentication', + 'helpers.requestWithAuthenticationPaginated', + // "helpers.normalizeItems" + // "helpers.constructExecutionMetaData" + // "helpers.assertBinaryData" + 'helpers.getBinaryDataBuffer', + // "helpers.copyInputItems" + // "helpers.returnJsonArray" + 'helpers.getSSHClient', + 'helpers.createReadStream', + // "helpers.getStoragePath" + 'helpers.writeContentToFile', + 'helpers.prepareBinaryData', + 'helpers.setBinaryDataBuffer', + 'helpers.copyBinaryFile', + 'helpers.binaryToBuffer', + // "helpers.binaryToString" + // "helpers.getBinaryPath" + 'helpers.getBinaryStream', + 'helpers.getBinaryMetadata', + 'helpers.createDeferredPromise', + 'helpers.httpRequest', +] as const; diff --git a/src/index.ts b/src/index.ts index fb71ff6..5ca0746 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,24 +3,24 @@ import { runInNewContext, type Context } from 'node:vm'; import { type MessageEvent, WebSocket } from 'ws'; import { nanoid } from 'nanoid'; import { - INode, - INodeType, - ITaskDataConnections, + type INode, + type INodeType, + type ITaskDataConnections, WorkflowDataProxy, - WorkflowParameters, + type WorkflowParameters, } from 'n8n-workflow'; import { - IDataObject, - IExecuteData, - INodeExecutionData, - INodeParameters, - IRunExecutionData, - IWorkflowDataProxyAdditionalKeys, + type IDataObject, + type IExecuteData, + type INodeExecutionData, + type INodeParameters, + type IRunExecutionData, + type IWorkflowDataProxyAdditionalKeys, Workflow, - WorkflowExecuteMode, + type WorkflowExecuteMode, } from 'n8n-workflow'; -import type { AgentMessage, N8nMessage } from './agent-types'; +import { RPC_ALLOW_LIST, type AgentMessage, type N8nMessage } from './agent-types'; interface AgentJob { jobId: string; @@ -40,6 +40,12 @@ interface DataRequest { reject: (error: unknown) => void; } +interface RPCCall { + callId: string; + resolve: (data: unknown) => void; + reject: (error: unknown) => void; +} + const VALID_TIME_MS = 1000; const VALID_EXTRA_MS = 100; @@ -58,6 +64,8 @@ class Agent { dataRequests: Record = {}; + rpcCalls: Record = {}; + constructor( public jobType: string, private wsUrl: string, @@ -114,7 +122,6 @@ class Agent { process.hrtime.bigint() + BigInt((VALID_TIME_MS + VALID_EXTRA_MS) * 1_000_000), // Adding a little extra time to account for latency }; this.openOffers[offer.offerId] = offer; - // console.log('Offering job:', offer.offerId); this.send({ type: 'agent:joboffer', jobType: this.jobType, @@ -154,6 +161,8 @@ class Agent { case 'n8n:jobdataresponse': this.processDataResponse(message.requestId, message.data); break; + case 'n8n:rpcresponse': + this.handleRpcResponse(message.callId, message.status, message.data); } } @@ -290,6 +299,67 @@ class Agent { return p as T; } + + async makeRpcCall(jobId: string, name: AgentMessage.ToN8n.RPC['name'], params: unknown[]) { + const callId = nanoid(); + + const dataPromise = new Promise((resolve, reject) => { + this.rpcCalls[callId] = { + callId, + resolve, + reject, + }; + }); + + this.send({ + type: 'agent:rpc', + callId, + jobId, + name, + params, + }); + + try { + return await dataPromise; + } finally { + delete this.rpcCalls[callId]; + } + } + + handleRpcResponse( + callId: string, + status: N8nMessage.ToAgent.RPCResponse['status'], + data: unknown, + ) { + const call = this.rpcCalls[callId]; + if (!call) { + return; + } + if (status === 'success') { + call.resolve(data); + } else { + call.reject(typeof data === 'string' ? new Error(data) : data); + } + } + + buildRpcCallObject(jobId: string) { + const rpcObject: any = {}; + for (const r of RPC_ALLOW_LIST) { + const splitPath = r.split('.'); + let obj = rpcObject; + + splitPath.forEach((s, index) => { + if (index !== splitPath.length - 1) { + obj[s] = {}; + obj = obj[s]; + return; + } + // eslint-disable-next-line + obj[s] = (...args: unknown[]) => this.makeRpcCall(jobId, r, args); + }); + } + return rpcObject; + } } interface JSExecSettings { @@ -321,18 +391,15 @@ const getAdditionalKeys = (): IWorkflowDataProxyAdditionalKeys => { return {}; }; -class TestAgent extends Agent { +class JsAgent extends Agent { constructor(jobType: string, wsUrl: string, maxConcurrency: number, name?: string) { super(jobType, wsUrl, maxConcurrency, name ?? 'Test Agent'); } async executeJob(job: AgentJob): Promise { - console.log('Executing: ', job); - const allData = await this.requestData(job.jobId, 'all'); const settings = job.settings!; - console.log(allData, settings); const workflowParams = allData.workflow; const workflow = new Workflow({ @@ -371,6 +438,7 @@ class TestAgent extends Agent { module: {}, ...dataProxy.getDataProxy(), + ...this.buildRpcCallObject(job.jobId), }; const result = (await runInNewContext( @@ -382,4 +450,4 @@ class TestAgent extends Agent { } } -new TestAgent('javascript', 'ws://localhost:5678/rest/agents/_ws', 5); +new JsAgent('javascript', 'ws://localhost:5678/rest/agents/_ws', 5);