diff --git a/src/agent-types.ts b/src/agent-types.ts deleted file mode 100644 index 8a69215..0000000 --- a/src/agent-types.ts +++ /dev/null @@ -1,225 +0,0 @@ -import type { INodeExecutionData } from 'n8n-workflow'; - -export type DataRequestType = 'input' | 'node' | 'all'; - -export namespace N8nMessage { - export namespace ToAgent { - export interface InfoRequest { - type: 'n8n:inforequest'; - } - - export interface AgentRegistered { - type: 'n8n:agentregistered'; - } - - export interface JobOfferAccept { - type: 'n8n:jobofferaccept'; - jobId: string; - offerId: string; - } - - export interface JobCancel { - type: 'n8n:jobcancel'; - jobId: string; - reason: string; - } - - export interface JobSettings { - type: 'n8n:jobsettings'; - jobId: string; - settings: unknown; - } - - export interface RPCResponse { - type: 'n8n:rpcresponse'; - callId: string; - jobId: string; - status: 'success' | 'error'; - data: unknown; - } - - export interface JobDataResponse { - type: 'n8n:jobdataresponse'; - jobId: string; - requestId: string; - data: unknown; - } - - export type All = - | InfoRequest - | JobOfferAccept - | JobCancel - | JobSettings - | AgentRegistered - | RPCResponse - | JobDataResponse; - } - - export namespace ToWorker { - export interface JobReady { - type: 'n8n:jobready'; - requestId: string; - jobId: string; - } - - export interface JobDone { - type: 'n8n:jobdone'; - jobId: string; - data: INodeExecutionData[]; - } - - export interface JobError { - type: 'n8n:joberror'; - jobId: string; - error: unknown; - } - - export interface JobDataRequest { - type: 'n8n:jobdatarequest'; - jobId: string; - requestId: string; - requestType: DataRequestType; - param?: string; - } - - export interface RPC { - type: 'n8n:rpc'; - callId: string; - jobId: string; - name: (typeof RPC_ALLOW_LIST)[number]; - params: unknown[]; - } - - export type All = JobReady | JobDone | JobError | JobDataRequest | RPC; - } -} - -export namespace WorkerMessage { - export namespace ToN8n { - export interface JobSettings { - type: 'worker:jobsettings'; - jobId: string; - settings: unknown; - } - - export interface JobCancel { - type: 'worker:jobcancel'; - jobId: string; - reason: string; - } - - export interface JobDataResponse { - type: 'worker:jobdataresponse'; - jobId: string; - requestId: string; - data: unknown; - } - - export interface RPCResponse { - type: 'worker:rpcresponse'; - jobId: string; - callId: string; - status: 'success' | 'error'; - data: unknown; - } - - export interface JobRequest { - type: 'worker:jobrequest'; - requestId: string; - jobType: string; - } - - export type All = JobSettings | JobCancel | RPCResponse | JobDataResponse | JobRequest; - } -} - -export namespace AgentMessage { - export namespace ToN8n { - export interface Info { - type: 'agent:info'; - name: string; - types: string[]; - } - - export interface JobAccepted { - type: 'agent:jobaccepted'; - jobId: string; - } - - export interface JobRejected { - type: 'agent:jobrejected'; - jobId: string; - reason: string; - } - - export interface JobDone { - type: 'agent:jobdone'; - jobId: string; - data: INodeExecutionData[]; - } - - export interface JobError { - type: 'agent:joberror'; - jobId: string; - error: unknown; - } - - export interface JobOffer { - type: 'agent:joboffer'; - offerId: string; - jobType: string; - validFor: number; - } - - export interface JobDataRequest { - type: 'agent:jobdatarequest'; - jobId: string; - requestId: string; - requestType: DataRequestType; - param?: string; - } - - export interface RPC { - type: 'agent:rpc'; - callId: string; - jobId: string; - name: (typeof RPC_ALLOW_LIST)[number]; - params: unknown[]; - } - - export type All = - | Info - | JobDone - | JobError - | JobAccepted - | JobRejected - | JobOffer - | RPC - | JobDataRequest; - } -} - -export const RPC_ALLOW_LIST = [ - 'helpers.httpRequestWithAuthentication', - 'helpers.requestWithAuthenticationPaginated', - // "helpers.normalizeItems" - // "helpers.constructExecutionMetaData" - // "helpers.assertBinaryData" - 'helpers.getBinaryDataBuffer', - // "helpers.copyInputItems" - // "helpers.returnJsonArray" - 'helpers.getSSHClient', - 'helpers.createReadStream', - // "helpers.getStoragePath" - 'helpers.writeContentToFile', - 'helpers.prepareBinaryData', - 'helpers.setBinaryDataBuffer', - 'helpers.copyBinaryFile', - 'helpers.binaryToBuffer', - // "helpers.binaryToString" - // "helpers.getBinaryPath" - 'helpers.getBinaryStream', - 'helpers.getBinaryMetadata', - 'helpers.createDeferredPromise', - 'helpers.httpRequest', -] as const; diff --git a/src/index.ts b/src/index.ts index 5ca0746..298ecd8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -20,16 +20,16 @@ import { type WorkflowExecuteMode, } from 'n8n-workflow'; -import { RPC_ALLOW_LIST, type AgentMessage, type N8nMessage } from './agent-types'; +import { RPC_ALLOW_LIST, type RunnerMessage, type BrokerMessage } from './runner-types'; -interface AgentJob { - jobId: string; +interface Task { + taskId: string; settings?: T; active: boolean; cancelled: boolean; } -interface JobOffer { +interface TaskOffer { offerId: string; validUntil: bigint; } @@ -49,25 +49,25 @@ interface RPCCall { const VALID_TIME_MS = 1000; const VALID_EXTRA_MS = 100; -class Agent { +class TaskRunner { id: string; ws: WebSocket; canSendOffers = false; - runningJobs: Record = {}; + runningTasks: Record = {}; offerInterval: NodeJS.Timeout | undefined; - openOffers: Record = {}; + openOffers: Record = {}; dataRequests: Record = {}; rpcCalls: Record = {}; constructor( - public jobType: string, + public taskType: string, private wsUrl: string, private maxConcurrency: number, public name?: string, @@ -75,15 +75,15 @@ class Agent { this.id = nanoid(); this.ws = new WebSocket(this.wsUrl + '?id=' + this.id); this.ws.addEventListener('message', this._wsMessage); - this.ws.addEventListener('close', this.stopJobOffers); + this.ws.addEventListener('close', this.stopTaskOffers); } private _wsMessage = (message: MessageEvent) => { - const data = JSON.parse(message.data as string) as N8nMessage.ToAgent.All; + const data = JSON.parse(message.data as string) as BrokerMessage.ToRunner.All; void this.onMessage(data); }; - private stopJobOffers = () => { + private stopTaskOffers = () => { this.canSendOffers = false; if (this.offerInterval) { clearInterval(this.offerInterval); @@ -91,7 +91,7 @@ class Agent { } }; - private startJobOffers() { + private startTaskOffers() { this.canSendOffers = true; if (this.offerInterval) { clearInterval(this.offerInterval); @@ -112,19 +112,19 @@ class Agent { const offersToSend = this.maxConcurrency - - (Object.values(this.openOffers).length + Object.values(this.runningJobs).length); + (Object.values(this.openOffers).length + Object.values(this.runningTasks).length); if (offersToSend > 0) { for (let i = 0; i < offersToSend; i++) { - const offer: JobOffer = { + const offer: TaskOffer = { offerId: nanoid(), validUntil: process.hrtime.bigint() + BigInt((VALID_TIME_MS + VALID_EXTRA_MS) * 1_000_000), // Adding a little extra time to account for latency }; this.openOffers[offer.offerId] = offer; this.send({ - type: 'agent:joboffer', - jobType: this.jobType, + type: 'runner:taskoffer', + taskType: this.taskType, offerId: offer.offerId, validFor: VALID_TIME_MS, }); @@ -132,36 +132,36 @@ class Agent { } } - send(message: AgentMessage.ToN8n.All) { + send(message: RunnerMessage.ToBroker.All) { this.ws.send(JSON.stringify(message)); } - onMessage(message: N8nMessage.ToAgent.All) { + onMessage(message: BrokerMessage.ToRunner.All) { console.log({ message }); switch (message.type) { - case 'n8n:inforequest': + case 'broker:inforequest': this.send({ - type: 'agent:info', - name: this.name ?? 'Node.js Agent SDK', - types: [this.jobType], + type: 'runner:info', + name: this.name ?? 'Node.js Task Runner SDK', + types: [this.taskType], }); break; - case 'n8n:agentregistered': - this.startJobOffers(); + case 'broker:runnerregistered': + this.startTaskOffers(); break; - case 'n8n:jobofferaccept': - this.offerAccepted(message.offerId, message.jobId); + case 'broker:taskofferaccept': + this.offerAccepted(message.offerId, message.taskId); break; - case 'n8n:jobcancel': - this.jobCancelled(message.jobId); + case 'broker:taskcancel': + this.taskCancelled(message.taskId); break; - case 'n8n:jobsettings': - void this.receivedSettings(message.jobId, message.settings); + case 'broker:tasksettings': + void this.receivedSettings(message.taskId, message.settings); break; - case 'n8n:jobdataresponse': + case 'broker:taskdataresponse': this.processDataResponse(message.requestId, message.data); break; - case 'n8n:rpcresponse': + case 'broker:rpcresponse': this.handleRpcResponse(message.callId, message.status, message.data); } } @@ -175,26 +175,26 @@ class Agent { request.resolve(data); } - hasOpenJobs() { - return Object.values(this.runningJobs).length < this.maxConcurrency; + hasOpenTasks() { + return Object.values(this.runningTasks).length < this.maxConcurrency; } - offerAccepted(offerId: string, jobId: string) { - if (!this.hasOpenJobs()) { + offerAccepted(offerId: string, taskId: string) { + if (!this.hasOpenTasks()) { this.send({ - type: 'agent:jobrejected', - jobId, - reason: 'No open job slots', + type: 'runner:taskrejected', + taskId, + reason: 'No open task slots', }); return; } const offer = this.openOffers[offerId]; if (!offer) { - if (!this.hasOpenJobs()) { + if (!this.hasOpenTasks()) { this.send({ - type: 'agent:jobrejected', - jobId, - reason: 'Offer expired and no open job slots', + type: 'runner:taskrejected', + taskId, + reason: 'Offer expired and no open task slots', }); return; } @@ -202,81 +202,81 @@ class Agent { delete this.openOffers[offerId]; } - this.runningJobs[jobId] = { - jobId, + this.runningTasks[taskId] = { + taskId, active: false, cancelled: false, }; this.send({ - type: 'agent:jobaccepted', - jobId, + type: 'runner:taskaccepted', + taskId, }); } - jobCancelled(jobId: string) { - const job = this.runningJobs[jobId]; - if (!job) { + taskCancelled(taskId: string) { + const task = this.runningTasks[taskId]; + if (!task) { return; } - job.cancelled = true; - if (job.active) { + task.cancelled = true; + if (task.active) { // TODO } else { - delete this.runningJobs[jobId]; + delete this.runningTasks[taskId]; } this.sendOffers(); } - jobErrored(jobId: string, error: unknown) { + taskErrored(taskId: string, error: unknown) { this.send({ - type: 'agent:joberror', - jobId, + type: 'runner:taskerror', + taskId, error, }); - delete this.runningJobs[jobId]; + delete this.runningTasks[taskId]; } - jobDone(jobId: string, data: AgentMessage.ToN8n.JobDone['data']) { + taskDone(taskId: string, data: RunnerMessage.ToBroker.TaskDone['data']) { this.send({ - type: 'agent:jobdone', - jobId, + type: 'runner:taskdone', + taskId, data, }); - delete this.runningJobs[jobId]; + delete this.runningTasks[taskId]; } - async receivedSettings(jobId: string, settings: unknown) { - const job = this.runningJobs[jobId]; - if (!job) { + async receivedSettings(taskId: string, settings: unknown) { + const task = this.runningTasks[taskId]; + if (!task) { return; } - if (job.cancelled) { - delete this.runningJobs[jobId]; + if (task.cancelled) { + delete this.runningTasks[taskId]; return; } - job.settings = settings; - job.active = true; + task.settings = settings; + task.active = true; try { - const data = await this.executeJob(job); - this.jobDone(jobId, data); + const data = await this.executeTask(task); + this.taskDone(taskId, data); } catch (e) { if ('message' in (e as Error)) { - this.jobErrored(jobId, (e as Error).message); + this.taskErrored(taskId, (e as Error).message); } else { - this.jobErrored(jobId, e); + this.taskErrored(taskId, e); } } } // eslint-disable-next-line @typescript-eslint/no-unused-vars - async executeJob(job: AgentJob): Promise { + async executeTask(task: Task): Promise { throw new Error('Unimplemented'); } async requestData( - jobId: AgentJob['jobId'], - type: AgentMessage.ToN8n.JobDataRequest['requestType'], + taskId: Task['taskId'], + type: RunnerMessage.ToBroker.TaskDataRequest['requestType'], param?: string, ): Promise { const requestId = nanoid(); @@ -290,8 +290,8 @@ class Agent { }); this.send({ - type: 'agent:jobdatarequest', - jobId, + type: 'runner:taskdatarequest', + taskId, requestId, requestType: type, param, @@ -300,7 +300,7 @@ class Agent { return p as T; } - async makeRpcCall(jobId: string, name: AgentMessage.ToN8n.RPC['name'], params: unknown[]) { + async makeRpcCall(taskId: string, name: RunnerMessage.ToBroker.RPC['name'], params: unknown[]) { const callId = nanoid(); const dataPromise = new Promise((resolve, reject) => { @@ -312,9 +312,9 @@ class Agent { }); this.send({ - type: 'agent:rpc', + type: 'runner:rpc', callId, - jobId, + taskId, name, params, }); @@ -328,7 +328,7 @@ class Agent { handleRpcResponse( callId: string, - status: N8nMessage.ToAgent.RPCResponse['status'], + status: BrokerMessage.ToRunner.RPCResponse['status'], data: unknown, ) { const call = this.rpcCalls[callId]; @@ -342,7 +342,7 @@ class Agent { } } - buildRpcCallObject(jobId: string) { + buildRpcCallObject(taskId: string) { const rpcObject: any = {}; for (const r of RPC_ALLOW_LIST) { const splitPath = r.split('.'); @@ -355,7 +355,7 @@ class Agent { return; } // eslint-disable-next-line - obj[s] = (...args: unknown[]) => this.makeRpcCall(jobId, r, args); + obj[s] = (...args: unknown[]) => this.makeRpcCall(taskId, r, args); }); } return rpcObject; @@ -391,15 +391,15 @@ const getAdditionalKeys = (): IWorkflowDataProxyAdditionalKeys => { return {}; }; -class JsAgent extends Agent { - constructor(jobType: string, wsUrl: string, maxConcurrency: number, name?: string) { - super(jobType, wsUrl, maxConcurrency, name ?? 'Test Agent'); +class JsTaskRunner extends TaskRunner { + constructor(taskType: string, wsUrl: string, maxConcurrency: number, name?: string) { + super(taskType, wsUrl, maxConcurrency, name ?? 'Test Runner'); } - async executeJob(job: AgentJob): Promise { - const allData = await this.requestData(job.jobId, 'all'); + async executeTask(task: Task): Promise { + const allData = await this.requestData(task.taskId, 'all'); - const settings = job.settings!; + const settings = task.settings!; const workflowParams = allData.workflow; const workflow = new Workflow({ @@ -438,16 +438,16 @@ class JsAgent extends Agent { module: {}, ...dataProxy.getDataProxy(), - ...this.buildRpcCallObject(job.jobId), + ...this.buildRpcCallObject(task.taskId), }; const result = (await runInNewContext( - `module.exports = async function() {${job.settings!.code}\n}()`, + `module.exports = async function() {${task.settings!.code}\n}()`, context, - )) as AgentMessage.ToN8n.JobDone['data']; + )) as RunnerMessage.ToBroker.TaskDone['data']; return result; } } -new JsAgent('javascript', 'ws://localhost:5678/rest/agents/_ws', 5); +new JsTaskRunner('javascript', 'ws://localhost:5678/rest/runners/_ws', 5); diff --git a/src/runner-types.ts b/src/runner-types.ts new file mode 100644 index 0000000..39b80e0 --- /dev/null +++ b/src/runner-types.ts @@ -0,0 +1,225 @@ +import type { INodeExecutionData } from 'n8n-workflow'; + +export type DataRequestType = 'input' | 'node' | 'all'; + +export namespace BrokerMessage { + export namespace ToRunner { + export interface InfoRequest { + type: 'broker:inforequest'; + } + + export interface RunnerRegistered { + type: 'broker:runnerregistered'; + } + + export interface TaskOfferAccept { + type: 'broker:taskofferaccept'; + taskId: string; + offerId: string; + } + + export interface TaskCancel { + type: 'broker:taskcancel'; + taskId: string; + reason: string; + } + + export interface TaskSettings { + type: 'broker:tasksettings'; + taskId: string; + settings: unknown; + } + + export interface RPCResponse { + type: 'broker:rpcresponse'; + callId: string; + taskId: string; + status: 'success' | 'error'; + data: unknown; + } + + export interface TaskDataResponse { + type: 'broker:taskdataresponse'; + taskId: string; + requestId: string; + data: unknown; + } + + export type All = + | InfoRequest + | TaskOfferAccept + | TaskCancel + | TaskSettings + | RunnerRegistered + | RPCResponse + | TaskDataResponse; + } + + export namespace ToRequester { + export interface TaskReady { + type: 'broker:taskready'; + requestId: string; + taskId: string; + } + + export interface TaskDone { + type: 'broker:taskdone'; + taskId: string; + data: INodeExecutionData[]; + } + + export interface TaskError { + type: 'broker:taskerror'; + taskId: string; + error: unknown; + } + + export interface TaskDataRequest { + type: 'broker:taskdatarequest'; + taskId: string; + requestId: string; + requestType: DataRequestType; + param?: string; + } + + export interface RPC { + type: 'broker:rpc'; + callId: string; + taskId: string; + name: (typeof RPC_ALLOW_LIST)[number]; + params: unknown[]; + } + + export type All = TaskReady | TaskDone | TaskError | TaskDataRequest | RPC; + } +} + +export namespace RequesterMessage { + export namespace ToBroker { + export interface TaskSettings { + type: 'requester:tasksettings'; + taskId: string; + settings: unknown; + } + + export interface TaskCancel { + type: 'requester:taskcancel'; + taskId: string; + reason: string; + } + + export interface TaskDataResponse { + type: 'requester:taskdataresponse'; + taskId: string; + requestId: string; + data: unknown; + } + + export interface RPCResponse { + type: 'requester:rpcresponse'; + taskId: string; + callId: string; + status: 'success' | 'error'; + data: unknown; + } + + export interface TaskRequest { + type: 'requester:taskrequest'; + requestId: string; + taskType: string; + } + + export type All = TaskSettings | TaskCancel | RPCResponse | TaskDataResponse | TaskRequest; + } +} + +export namespace RunnerMessage { + export namespace ToBroker { + export interface Info { + type: 'runner:info'; + name: string; + types: string[]; + } + + export interface TaskAccepted { + type: 'runner:taskaccepted'; + taskId: string; + } + + export interface TaskRejected { + type: 'runner:taskrejected'; + taskId: string; + reason: string; + } + + export interface TaskDone { + type: 'runner:taskdone'; + taskId: string; + data: INodeExecutionData[]; + } + + export interface TaskError { + type: 'runner:taskerror'; + taskId: string; + error: unknown; + } + + export interface TaskOffer { + type: 'runner:taskoffer'; + offerId: string; + taskType: string; + validFor: number; + } + + export interface TaskDataRequest { + type: 'runner:taskdatarequest'; + taskId: string; + requestId: string; + requestType: DataRequestType; + param?: string; + } + + export interface RPC { + type: 'runner:rpc'; + callId: string; + taskId: string; + name: (typeof RPC_ALLOW_LIST)[number]; + params: unknown[]; + } + + export type All = + | Info + | TaskDone + | TaskError + | TaskAccepted + | TaskRejected + | TaskOffer + | RPC + | TaskDataRequest; + } +} + +export const RPC_ALLOW_LIST = [ + 'helpers.httpRequestWithAuthentication', + 'helpers.requestWithAuthenticationPaginated', + // "helpers.normalizeItems" + // "helpers.constructExecutionMetaData" + // "helpers.assertBinaryData" + 'helpers.getBinaryDataBuffer', + // "helpers.copyInputItems" + // "helpers.returnJsonArray" + 'helpers.getSSHClient', + 'helpers.createReadStream', + // "helpers.getStoragePath" + 'helpers.writeContentToFile', + 'helpers.prepareBinaryData', + 'helpers.setBinaryDataBuffer', + 'helpers.copyBinaryFile', + 'helpers.binaryToBuffer', + // "helpers.binaryToString" + // "helpers.getBinaryPath" + 'helpers.getBinaryStream', + 'helpers.getBinaryMetadata', + 'helpers.createDeferredPromise', + 'helpers.httpRequest', +] as const;