Skip to content

Commit

Permalink
Add RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
valya committed Sep 5, 2024
1 parent 18c908d commit a8f351e
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 63 deletions.
86 changes: 41 additions & 45 deletions src/agent-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@ export namespace N8nMessage {

export interface JobSettings {
type: 'n8n:jobsettings';
jobId: Job['id'];
jobId: string;
settings: unknown;
}

export interface RPCResponse {
type: 'n8n:rpcresponse';
callId: string;
jobId: Job['id'];
jobId: string;
status: 'success' | 'error';
data: unknown;
}

export interface JobDataResponse {
type: 'n8n:jobdataresponse';
jobId: Job['id'];
jobId: string;
requestId: string;
data: unknown;
}
Expand All @@ -58,25 +58,25 @@ export namespace N8nMessage {
export namespace ToWorker {
export interface JobReady {
type: 'n8n:jobready';
requestId: JobRequest['requestId'];
jobId: Job['id'];
requestId: string;
jobId: string;
}

export interface JobDone {
type: 'n8n:jobdone';
jobId: Job['id'];
jobId: string;
data: INodeExecutionData[];
}

export interface JobError {
type: 'n8n:joberror';
jobId: Job['id'];
jobId: string;
error: unknown;
}

export interface JobDataRequest {
type: 'n8n:jobdatarequest';
jobId: Job['id'];
jobId: string;
requestId: string;
requestType: DataRequestType;
param?: string;
Expand All @@ -85,8 +85,8 @@ export namespace N8nMessage {
export interface RPC {
type: 'n8n:rpc';
callId: string;
jobId: Job['id'];
name: string;
jobId: string;
name: (typeof RPC_ALLOW_LIST)[number];
params: unknown[];
}

Expand All @@ -98,25 +98,26 @@ export namespace WorkerMessage {
export namespace ToN8n {
export interface JobSettings {
type: 'worker:jobsettings';
jobId: Job['id'];
jobId: string;
settings: unknown;
}

export interface JobCancel {
type: 'worker:jobcancel';
jobId: Job['id'];
jobId: string;
reason: string;
}

export interface JobDataResponse {
type: 'worker:jobdataresponse';
jobId: Job['id'];
jobId: string;
requestId: string;
data: unknown;
}

export interface RPCResponse {
type: 'worker:rpcresponse';
jobId: string;
callId: string;
status: 'success' | 'error';
data: unknown;
Expand Down Expand Up @@ -172,7 +173,7 @@ export namespace AgentMessage {

export interface JobDataRequest {
type: 'agent:jobdatarequest';
jobId: Job['id'];
jobId: string;
requestId: string;
requestType: DataRequestType;
param?: string;
Expand All @@ -181,8 +182,8 @@ export namespace AgentMessage {
export interface RPC {
type: 'agent:rpc';
callId: string;
jobId: Job['id'];
name: string;
jobId: string;
name: (typeof RPC_ALLOW_LIST)[number];
params: unknown[];
}

Expand All @@ -198,32 +199,27 @@ export namespace AgentMessage {
}
}

export interface Agent {
id: string;
name?: string;
jobTypes: string[];
lastSeen: Date;
}

export interface Job {
id: string;
agentId: Agent['id'];
workerId: string;
jobType: string;
}

export interface JobOffer {
offerId: string;
agentId: Agent['id'];
jobType: string;
validFor: number;
validUntil: bigint;
}

export interface JobRequest {
requestId: string;
workerId: string;
jobType: string;

acceptInProgress?: boolean;
}
export const RPC_ALLOW_LIST = [
'helpers.httpRequestWithAuthentication',
'helpers.requestWithAuthenticationPaginated',
// "helpers.normalizeItems"
// "helpers.constructExecutionMetaData"
// "helpers.assertBinaryData"
'helpers.getBinaryDataBuffer',
// "helpers.copyInputItems"
// "helpers.returnJsonArray"
'helpers.getSSHClient',
'helpers.createReadStream',
// "helpers.getStoragePath"
'helpers.writeContentToFile',
'helpers.prepareBinaryData',
'helpers.setBinaryDataBuffer',
'helpers.copyBinaryFile',
'helpers.binaryToBuffer',
// "helpers.binaryToString"
// "helpers.getBinaryPath"
'helpers.getBinaryStream',
'helpers.getBinaryMetadata',
'helpers.createDeferredPromise',
'helpers.httpRequest',
] as const;
104 changes: 86 additions & 18 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@ import { runInNewContext, type Context } from 'node:vm';
import { type MessageEvent, WebSocket } from 'ws';
import { nanoid } from 'nanoid';
import {
INode,
INodeType,
ITaskDataConnections,
type INode,
type INodeType,
type ITaskDataConnections,
WorkflowDataProxy,
WorkflowParameters,
type WorkflowParameters,
} from 'n8n-workflow';
import {
IDataObject,
IExecuteData,
INodeExecutionData,
INodeParameters,
IRunExecutionData,
IWorkflowDataProxyAdditionalKeys,
type IDataObject,
type IExecuteData,
type INodeExecutionData,
type INodeParameters,
type IRunExecutionData,
type IWorkflowDataProxyAdditionalKeys,
Workflow,
WorkflowExecuteMode,
type WorkflowExecuteMode,
} from 'n8n-workflow';

import type { AgentMessage, N8nMessage } from './agent-types';
import { RPC_ALLOW_LIST, type AgentMessage, type N8nMessage } from './agent-types';

interface AgentJob<T = unknown> {
jobId: string;
Expand All @@ -40,6 +40,12 @@ interface DataRequest {
reject: (error: unknown) => void;
}

interface RPCCall {
callId: string;
resolve: (data: unknown) => void;
reject: (error: unknown) => void;
}

const VALID_TIME_MS = 1000;
const VALID_EXTRA_MS = 100;

Expand All @@ -58,6 +64,8 @@ class Agent {

dataRequests: Record<DataRequest['requestId'], DataRequest> = {};

rpcCalls: Record<RPCCall['callId'], RPCCall> = {};

constructor(
public jobType: string,
private wsUrl: string,
Expand Down Expand Up @@ -114,7 +122,6 @@ class Agent {
process.hrtime.bigint() + BigInt((VALID_TIME_MS + VALID_EXTRA_MS) * 1_000_000), // Adding a little extra time to account for latency
};
this.openOffers[offer.offerId] = offer;
// console.log('Offering job:', offer.offerId);
this.send({
type: 'agent:joboffer',
jobType: this.jobType,
Expand Down Expand Up @@ -154,6 +161,8 @@ class Agent {
case 'n8n:jobdataresponse':
this.processDataResponse(message.requestId, message.data);
break;
case 'n8n:rpcresponse':
this.handleRpcResponse(message.callId, message.status, message.data);
}
}

Expand Down Expand Up @@ -290,6 +299,67 @@ class Agent {

return p as T;
}

async makeRpcCall(jobId: string, name: AgentMessage.ToN8n.RPC['name'], params: unknown[]) {
const callId = nanoid();

const dataPromise = new Promise((resolve, reject) => {
this.rpcCalls[callId] = {
callId,
resolve,
reject,
};
});

this.send({
type: 'agent:rpc',
callId,
jobId,
name,
params,
});

try {
return await dataPromise;
} finally {
delete this.rpcCalls[callId];
}
}

handleRpcResponse(
callId: string,
status: N8nMessage.ToAgent.RPCResponse['status'],
data: unknown,
) {
const call = this.rpcCalls[callId];
if (!call) {
return;
}
if (status === 'success') {
call.resolve(data);
} else {
call.reject(typeof data === 'string' ? new Error(data) : data);
}
}

buildRpcCallObject(jobId: string) {
const rpcObject: any = {};
for (const r of RPC_ALLOW_LIST) {
const splitPath = r.split('.');
let obj = rpcObject;

splitPath.forEach((s, index) => {
if (index !== splitPath.length - 1) {
obj[s] = {};
obj = obj[s];
return;
}
// eslint-disable-next-line
obj[s] = (...args: unknown[]) => this.makeRpcCall(jobId, r, args);
});
}
return rpcObject;
}
}

interface JSExecSettings {
Expand Down Expand Up @@ -321,18 +391,15 @@ const getAdditionalKeys = (): IWorkflowDataProxyAdditionalKeys => {
return {};
};

class TestAgent extends Agent {
class JsAgent extends Agent {
constructor(jobType: string, wsUrl: string, maxConcurrency: number, name?: string) {
super(jobType, wsUrl, maxConcurrency, name ?? 'Test Agent');
}

async executeJob(job: AgentJob<JSExecSettings>): Promise<AgentMessage.ToN8n.JobDone['data']> {
console.log('Executing: ', job);

const allData = await this.requestData<AllData>(job.jobId, 'all');

const settings = job.settings!;
console.log(allData, settings);

const workflowParams = allData.workflow;
const workflow = new Workflow({
Expand Down Expand Up @@ -371,6 +438,7 @@ class TestAgent extends Agent {
module: {},

...dataProxy.getDataProxy(),
...this.buildRpcCallObject(job.jobId),
};

const result = (await runInNewContext(
Expand All @@ -382,4 +450,4 @@ class TestAgent extends Agent {
}
}

new TestAgent('javascript', 'ws://localhost:5678/rest/agents/_ws', 5);
new JsAgent('javascript', 'ws://localhost:5678/rest/agents/_ws', 5);

0 comments on commit a8f351e

Please sign in to comment.