Skip to content

Commit

Permalink
Get basic up and running
Browse files Browse the repository at this point in the history
  • Loading branch information
valya committed Sep 5, 2024
1 parent 7994502 commit 18c908d
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 41 deletions.
10 changes: 7 additions & 3 deletions src/agent-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,13 @@ export namespace WorkerMessage {
data: unknown;
}

export type All = JobSettings | JobCancel | RPCResponse | JobDataResponse;
export interface JobRequest {
type: 'worker:jobrequest';
requestId: string;
jobType: string;
}

export type All = JobSettings | JobCancel | RPCResponse | JobDataResponse | JobRequest;
}
}

Expand Down Expand Up @@ -218,8 +224,6 @@ export interface JobRequest {
requestId: string;
workerId: string;
jobType: string;
validFor: number;
validUntil: bigint;

acceptInProgress?: boolean;
}
105 changes: 67 additions & 38 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ import { runInNewContext, type Context } from 'node:vm';

import { type MessageEvent, WebSocket } from 'ws';
import { nanoid } from 'nanoid';
import { WorkflowDataProxy } from 'n8n-workflow';
import type {
import {
INode,
INodeType,
ITaskDataConnections,
WorkflowDataProxy,
WorkflowParameters,
} from 'n8n-workflow';
import {
IDataObject,
IExecuteData,
INodeExecutionData,
Expand Down Expand Up @@ -59,7 +65,7 @@ class Agent {
public name?: string,
) {
this.id = nanoid();
this.ws = new WebSocket(wsUrl + '?id=' + this.id);
this.ws = new WebSocket(this.wsUrl + '?id=' + this.id);
this.ws.addEventListener('message', this._wsMessage);
this.ws.addEventListener('close', this.stopJobOffers);
}
Expand Down Expand Up @@ -108,7 +114,7 @@ class Agent {
process.hrtime.bigint() + BigInt((VALID_TIME_MS + VALID_EXTRA_MS) * 1_000_000), // Adding a little extra time to account for latency
};
this.openOffers[offer.offerId] = offer;
console.log('Offering job:', offer.offerId);
// console.log('Offering job:', offer.offerId);
this.send({
type: 'agent:joboffer',
jobType: this.jobType,
Expand Down Expand Up @@ -145,6 +151,9 @@ class Agent {
case 'n8n:jobsettings':
void this.receivedSettings(message.jobId, message.settings);
break;
case 'n8n:jobdataresponse':
this.processDataResponse(message.requestId, message.data);
break;
}
}

Expand Down Expand Up @@ -194,16 +203,6 @@ class Agent {
type: 'agent:jobaccepted',
jobId,
});

// TODO: customisable timeout
setTimeout(() => {
const job = this.runningJobs[jobId];
if (!job || job.active || job.cancelled) {
// No need to timeout
return;
}
this.jobErrored(jobId, 'Timed out waiting for settings');
}, 2000);
}

jobCancelled(jobId: string) {
Expand Down Expand Up @@ -253,7 +252,11 @@ class Agent {
const data = await this.executeJob(job);
this.jobDone(jobId, data);
} catch (e) {
this.jobErrored(jobId, e);
if ('message' in (e as Error)) {
this.jobErrored(jobId, (e as Error).message);
} else {
this.jobErrored(jobId, e);
}
}
}

Expand Down Expand Up @@ -293,53 +296,79 @@ interface JSExecSettings {
code: string;

// For workflow data proxy
workflow: Workflow;
runExecutionData: IRunExecutionData | null;
mode: WorkflowExecuteMode;
}

export interface AllData {
workflow: Omit<WorkflowParameters, 'nodeTypes'>;
inputData: ITaskDataConnections;
node: INode;

runExecutionData: IRunExecutionData;
runIndex: number;
itemIndex: number;
activeNodeName: string;
connectionInputData: INodeExecutionData[];
siblingParameters: INodeParameters;
mode: WorkflowExecuteMode;
additionalKeys: IWorkflowDataProxyAdditionalKeys;
executeData?: IExecuteData;
defaultReturnRunIndex?: number;
selfData?: IDataObject;
contextNodeName?: string;
defaultReturnRunIndex: number;
selfData: IDataObject;
contextNodeName: string;
}

const getAdditionalKeys = (): IWorkflowDataProxyAdditionalKeys => {
return {};
};

class TestAgent extends Agent {
constructor(jobType: string, wsUrl: string, maxConcurrency: number, name?: string) {
super(jobType, wsUrl, maxConcurrency, name ?? 'Test Agent');
}

async executeJob(job: AgentJob<JSExecSettings>): Promise<AgentMessage.ToN8n.JobDone['data']> {
console.log('Executing: ', job);
const allData = await this.requestData(job.jobId, 'all');

const allData = await this.requestData<AllData>(job.jobId, 'all');

const settings = job.settings!;
console.log(allData, settings);

const workflowParams = allData.workflow;
const workflow = new Workflow({
...workflowParams,
nodeTypes: {
getByNameAndVersion() {
return undefined as unknown as INodeType;
},
getByName() {
return undefined as unknown as INodeType;
},
getKnownTypes() {
return {};
},
},
});

const dataProxy = new WorkflowDataProxy(
settings.workflow,
settings.runExecutionData,
settings.runIndex,
settings.itemIndex,
settings.activeNodeName,
settings.connectionInputData,
settings.siblingParameters,
workflow,
allData.runExecutionData,
allData.runIndex,
allData.itemIndex,
allData.activeNodeName,
allData.connectionInputData,
allData.siblingParameters,
settings.mode,
settings.additionalKeys,
settings.executeData,
settings.defaultReturnRunIndex,
settings.selfData,
settings.contextNodeName,
getAdditionalKeys(),
allData.executeData,
allData.defaultReturnRunIndex,
allData.selfData,
allData.contextNodeName,
);

if (Math.round(Math.random())) {
throw new Error('Haha, whoops');
}

const context: Context = {
require,
module: {},

...dataProxy.getDataProxy(),
};
Expand Down

0 comments on commit 18c908d

Please sign in to comment.