From 67c21aec41ec0ddc3903d6f28cfaae490e41fc95 Mon Sep 17 00:00:00 2001 From: Vladislav Botvin Date: Wed, 28 Aug 2019 18:19:02 +0300 Subject: [PATCH] #47 persistent context --- __tests__/persistent-ctx.ts | 14 ++++++++++++++ src/interfaces.ts | 1 + src/worker-pool.ts | 5 ++++- src/worker.js | 5 ++++- 4 files changed, 23 insertions(+), 2 deletions(-) create mode 100644 __tests__/persistent-ctx.ts diff --git a/__tests__/persistent-ctx.ts b/__tests__/persistent-ctx.ts new file mode 100644 index 0000000..1090282 --- /dev/null +++ b/__tests__/persistent-ctx.ts @@ -0,0 +1,14 @@ +import { job, stop, start } from '../src/job' + +beforeAll(async () => await start()) +afterAll(async () => await stop()) + +describe('Persistent ctx testing', () => { + it('should return persistent ctx after save', async () => { + await job(() => {}, {persistentCtx: {test: true}}) + const persistentCtx = await job(() => { + return persistentCtx + }) + expect(persistentCtx).toEqual({test: true}) + }) +}) diff --git a/src/interfaces.ts b/src/interfaces.ts index 019cd0b..c6d340a 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -1,6 +1,7 @@ export interface Config { ctx?: T data?: U + persistentCtx?: any } export interface Task { diff --git a/src/worker-pool.ts b/src/worker-pool.ts index 346e488..7ea2a44 100644 --- a/src/worker-pool.ts +++ b/src/worker-pool.ts @@ -94,8 +94,9 @@ class WorkerPool { // @ts-ignore const dataSerialized = v8.serialize(config.data) const dataStr = JSON.stringify(dataSerialized) + const persistentCtxStr = JSON.stringify(config.persistentCtx || {}) const workerStr = ` - async function __executor__() { + async function __executor__(persistentCtx) { const v8 = require('v8') ${variables} const dataParsed = JSON.parse('${dataStr}') @@ -103,6 +104,8 @@ class WorkerPool { const dataDeserialized = v8.deserialize(dataBuffer) return await (${handler.toString()})(dataDeserialized) } + + __executor__.persistentCtx = JSON.parse('${persistentCtxStr}') ` // @ts-ignore diff --git a/src/worker.js b/src/worker.js index cb1c665..4007b49 100644 --- a/src/worker.js +++ b/src/worker.js @@ -1,5 +1,7 @@ const { parentPort } = require('worker_threads') +let persistentCtx = {} + parentPort.on('message', async worker => { const response = { error: null, @@ -9,7 +11,8 @@ parentPort.on('message', async worker => { try { eval(worker) // __executor__ is defined in worker - response.data = await __executor__() + Object.assign(persistentCtx, __executor__.persistentCtx) + response.data = await __executor__(persistentCtx) parentPort.postMessage(response) } catch (err) { response.data = null