From 90eaeed2d0f26c62345668335d6d94e46d4bf710 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Wed, 2 Apr 2025 21:19:43 +0700 Subject: [PATCH 01/11] feat: generic typesafe worker threadpool --- .nvmrc | 2 +- package-lock.json | 268 ++++++++++++++---- package.json | 13 +- src/helpers/__tests__/my-worker-errors.ts | 6 + .../__tests__/my-worker-export-default.ts | 15 + .../__tests__/my-worker-export-inline.ts | 12 + src/helpers/__tests__/worker.test.ts | 82 ++++++ src/helpers/serialize-error.ts | 163 +++++++++++ src/helpers/worker-threads.ts | 230 +++++++++++++++ tsconfig.json | 7 +- 10 files changed, 736 insertions(+), 62 deletions(-) create mode 100644 src/helpers/__tests__/my-worker-errors.ts create mode 100644 src/helpers/__tests__/my-worker-export-default.ts create mode 100644 src/helpers/__tests__/my-worker-export-inline.ts create mode 100644 src/helpers/__tests__/worker.test.ts create mode 100644 src/helpers/serialize-error.ts create mode 100644 src/helpers/worker-threads.ts diff --git a/.nvmrc b/.nvmrc index 3c03207..2bd5a0a 100644 --- a/.nvmrc +++ b/.nvmrc @@ -1 +1 @@ -18 +22 diff --git a/package-lock.json b/package-lock.json index 23c0e57..511c02d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,6 +13,7 @@ "@fastify/swagger": "^8.3.1", "@fastify/type-provider-typebox": "^3.2.0", "@sinclair/typebox": "^0.28.20", + "@types/node": "^22.13.17", "fastify": "^4.3.0", "fastify-metrics": "^10.2.0", "node-pg-migrate": "^6.2.2", @@ -26,7 +27,7 @@ "@commitlint/cli": "^17.5.0", "@commitlint/config-conventional": "^17.4.4", "@stacks/eslint-config": "^1.2.0", - "@types/jest": "^29.5.0", + "@types/jest": "^29.5.14", "@typescript-eslint/eslint-plugin": "^5.56.0", "@typescript-eslint/parser": "^5.56.0", "babel-jest": "^29.5.0", @@ -35,15 +36,15 @@ "eslint-plugin-prettier": "^4.2.1", "eslint-plugin-tsdoc": "^0.2.17", "husky": "^8.0.3", - "jest": "^29.5.0", + "jest": "^29.7.0", "prettier": "^2.8.6", "rimraf": "^4.4.1", - "ts-jest": "^29.0.5", - "ts-node": "^10.9.1", - "typescript": "^5.0.2" + "ts-jest": "^29.3.1", + "ts-node": "^10.9.2", + "typescript": "^5.8.2" }, "engines": { - "node": ">=18" + "node": ">=22" } }, "node_modules/@aashutoshrathi/word-wrap": { @@ -2803,10 +2804,11 @@ } }, "node_modules/@types/jest": { - "version": "29.5.5", - "resolved": "https://registry.npmjs.org/@types/jest/-/jest-29.5.5.tgz", - "integrity": "sha512-ebylz2hnsWR9mYvmBFbXJXr+33UPc4+ZdxyDXh5w0FlPBTfCVN3wPL+kuOiQt3xvrK419v7XWeAs+AeOksafXg==", + "version": "29.5.14", + "resolved": "https://registry.npmjs.org/@types/jest/-/jest-29.5.14.tgz", + "integrity": "sha512-ZN+4sdnLUbo8EVvVc2ao0GFW6oVrQRPn4K2lglySj7APvSrgzxHiNNK99us4WDMi57xxA2yggblIAMNhXOotLQ==", "dev": true, + "license": "MIT", "dependencies": { "expect": "^29.0.0", "pretty-format": "^29.0.0" @@ -2831,9 +2833,13 @@ "dev": true }, "node_modules/@types/node": { - "version": "20.8.2", - "resolved": "https://registry.npmjs.org/@types/node/-/node-20.8.2.tgz", - "integrity": "sha512-Vvycsc9FQdwhxE3y3DzeIxuEJbWGDsnrxvMADzTDF/lcdR9/K+AQIeAghTQsHtotg/q0j3WEOYS/jQgSdWue3w==" + "version": "22.13.17", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.13.17.tgz", + "integrity": "sha512-nAJuQXoyPj04uLgu+obZcSmsfOenUg6DxPKogeUy6yNCFwWaj5sBF8/G/pNo8EtBJjAfSVgfIlugR/BCOleO+g==", + "license": "MIT", + "dependencies": { + "undici-types": "~6.20.0" + } }, "node_modules/@types/normalize-package-data": { "version": "2.4.2", @@ -3545,6 +3551,13 @@ "node": ">=0.10.0" } }, + "node_modules/async": { + "version": "3.2.6", + "resolved": "https://registry.npmjs.org/async/-/async-3.2.6.tgz", + "integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA==", + "dev": true, + "license": "MIT" + }, "node_modules/atomic-sleep": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/atomic-sleep/-/atomic-sleep-1.0.0.tgz", @@ -4514,6 +4527,22 @@ "node": ">=8" } }, + "node_modules/ejs": { + "version": "3.1.10", + "resolved": "https://registry.npmjs.org/ejs/-/ejs-3.1.10.tgz", + "integrity": "sha512-UeJmFfOrAQS8OJWPZ4qtgHyWExa088/MtK5UEyoJGFH67cDEXkZSviOiKRCZ4Xij0zxI3JECgYs3oKx+AizQBA==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "jake": "^10.8.5" + }, + "bin": { + "ejs": "bin/cli.js" + }, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/electron-to-chromium": { "version": "1.4.539", "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.4.539.tgz", @@ -5460,6 +5489,39 @@ "node": "^10.12.0 || >=12.0.0" } }, + "node_modules/filelist": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/filelist/-/filelist-1.0.4.tgz", + "integrity": "sha512-w1cEuf3S+DrLCQL7ET6kz+gmlJdbq9J7yXCSjK/OZCPA+qEN1WyF4ZAf0YYJa4/shHJra2t/d/r8SV4Ji+x+8Q==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "minimatch": "^5.0.1" + } + }, + "node_modules/filelist/node_modules/brace-expansion": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", + "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "dev": true, + "license": "MIT", + "dependencies": { + "balanced-match": "^1.0.0" + } + }, + "node_modules/filelist/node_modules/minimatch": { + "version": "5.1.6", + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-5.1.6.tgz", + "integrity": "sha512-lKwV/1brpG6mBUFHtb7NUmtABCb2WZZmm2wNiOA5hAb8VdCS4B3dtMWyvcoViccwAW/COERjXLt0zP1zXUN26g==", + "dev": true, + "license": "ISC", + "dependencies": { + "brace-expansion": "^2.0.1" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/fill-range": { "version": "7.0.1", "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", @@ -6538,11 +6600,107 @@ "node": ">=8" } }, + "node_modules/jake": { + "version": "10.9.2", + "resolved": "https://registry.npmjs.org/jake/-/jake-10.9.2.tgz", + "integrity": "sha512-2P4SQ0HrLQ+fw6llpLnOaGAvN2Zu6778SJMrCUwns4fOoG9ayrTiZk3VV8sCPkVZF8ab0zksVpS8FDY5pRCNBA==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "async": "^3.2.3", + "chalk": "^4.0.2", + "filelist": "^1.0.4", + "minimatch": "^3.1.2" + }, + "bin": { + "jake": "bin/cli.js" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/jake/node_modules/ansi-styles": { + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz", + "integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==", + "dev": true, + "license": "MIT", + "dependencies": { + "color-convert": "^2.0.1" + }, + "engines": { + "node": ">=8" + }, + "funding": { + "url": "https://github.com/chalk/ansi-styles?sponsor=1" + } + }, + "node_modules/jake/node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, + "node_modules/jake/node_modules/color-convert": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", + "integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "color-name": "~1.1.4" + }, + "engines": { + "node": ">=7.0.0" + } + }, + "node_modules/jake/node_modules/color-name": { + "version": "1.1.4", + "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", + "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==", + "dev": true, + "license": "MIT" + }, + "node_modules/jake/node_modules/has-flag": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz", + "integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=8" + } + }, + "node_modules/jake/node_modules/supports-color": { + "version": "7.2.0", + "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz", + "integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==", + "dev": true, + "license": "MIT", + "dependencies": { + "has-flag": "^4.0.0" + }, + "engines": { + "node": ">=8" + } + }, "node_modules/jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest/-/jest-29.7.0.tgz", "integrity": "sha512-NIy3oAFp9shda19hy4HK0HRTWKtPJmGdnvywu01nOqNC2vZg+Z+fvJDxpMQA88eb2I9EcafcdjYgsDthnYTvGw==", "dev": true, + "license": "MIT", "dependencies": { "@jest/core": "^29.7.0", "@jest/types": "^29.6.3", @@ -10556,28 +10714,32 @@ } }, "node_modules/ts-jest": { - "version": "29.1.1", - "resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.1.1.tgz", - "integrity": "sha512-D6xjnnbP17cC85nliwGiL+tpoKN0StpgE0TeOjXQTU6MVCfsB4v7aW05CgQ/1OywGb0x/oy9hHFnN+sczTiRaA==", + "version": "29.3.1", + "resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.3.1.tgz", + "integrity": "sha512-FT2PIRtZABwl6+ZCry8IY7JZ3xMuppsEV9qFVHOVe8jDzggwUZ9TsM4chyJxL9yi6LvkqcZYU3LmapEE454zBQ==", "dev": true, + "license": "MIT", "dependencies": { - "bs-logger": "0.x", - "fast-json-stable-stringify": "2.x", + "bs-logger": "^0.2.6", + "ejs": "^3.1.10", + "fast-json-stable-stringify": "^2.1.0", "jest-util": "^29.0.0", "json5": "^2.2.3", - "lodash.memoize": "4.x", - "make-error": "1.x", - "semver": "^7.5.3", - "yargs-parser": "^21.0.1" + "lodash.memoize": "^4.1.2", + "make-error": "^1.3.6", + "semver": "^7.7.1", + "type-fest": "^4.38.0", + "yargs-parser": "^21.1.1" }, "bin": { "ts-jest": "cli.js" }, "engines": { - "node": "^14.15.0 || ^16.10.0 || >=18.0.0" + "node": "^14.15.0 || ^16.10.0 || ^18.0.0 || >=20.0.0" }, "peerDependencies": { "@babel/core": ">=7.0.0-beta.0 <8", + "@jest/transform": "^29.0.0", "@jest/types": "^29.0.0", "babel-jest": "^29.0.0", "jest": "^29.0.0", @@ -10587,6 +10749,9 @@ "@babel/core": { "optional": true }, + "@jest/transform": { + "optional": true + }, "@jest/types": { "optional": true }, @@ -10598,26 +10763,12 @@ } } }, - "node_modules/ts-jest/node_modules/lru-cache": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", - "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", - "dev": true, - "dependencies": { - "yallist": "^4.0.0" - }, - "engines": { - "node": ">=10" - } - }, "node_modules/ts-jest/node_modules/semver": { - "version": "7.5.4", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz", - "integrity": "sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA==", + "version": "7.7.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.1.tgz", + "integrity": "sha512-hlq8tAfn0m/61p4BVRcPzIGr6LKiMwo4VM6dGi6pt4qcRkmNzTcWq6eCEjEh+qXjkMDvPlOFFSGwQjoEa6gyMA==", "dev": true, - "dependencies": { - "lru-cache": "^6.0.0" - }, + "license": "ISC", "bin": { "semver": "bin/semver.js" }, @@ -10625,11 +10776,18 @@ "node": ">=10" } }, - "node_modules/ts-jest/node_modules/yallist": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", - "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==", - "dev": true + "node_modules/ts-jest/node_modules/type-fest": { + "version": "4.39.0", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.39.0.tgz", + "integrity": "sha512-w2IGJU1tIgcrepg9ZJ82d8UmItNQtOFJG0HCUE3SzMokKkTsruVDALl2fAdiEzJlfduoU+VyXJWIIUZ+6jV+nw==", + "dev": true, + "license": "(MIT OR CC0-1.0)", + "engines": { + "node": ">=16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } }, "node_modules/ts-jest/node_modules/yargs-parser": { "version": "21.1.1", @@ -10641,10 +10799,11 @@ } }, "node_modules/ts-node": { - "version": "10.9.1", - "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.9.1.tgz", - "integrity": "sha512-NtVysVPkxxrwFGUUxGYhfux8k78pQB3JqYBXlLRZgdGUqTO5wU/UyHop5p70iEbGhB7q5KmiZiU0Y3KlJrScEw==", + "version": "10.9.2", + "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.9.2.tgz", + "integrity": "sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==", "dev": true, + "license": "MIT", "dependencies": { "@cspotcode/source-map-support": "^0.8.0", "@tsconfig/node10": "^1.0.7", @@ -10836,10 +10995,11 @@ } }, "node_modules/typescript": { - "version": "5.2.2", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.2.2.tgz", - "integrity": "sha512-mI4WrpHsbCIcwT9cF4FZvr80QUeKvsUsUvKDoR+X/7XHQH98xYD8YHZg7ANtz2GtZt/CBq2QJ0thkGJMHfqc1w==", + "version": "5.8.2", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.8.2.tgz", + "integrity": "sha512-aJn6wq13/afZp/jT9QZmwEjDqqvSGp1VT5GVg+f/t6/oVyrgXM6BY1h9BRh/O5p3PlUPAe+WuiEZOmb/49RqoQ==", "dev": true, + "license": "Apache-2.0", "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -10863,6 +11023,12 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/undici-types": { + "version": "6.20.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.20.0.tgz", + "integrity": "sha512-Ny6QZ2Nju20vw1SRHe3d9jVu6gJ+4e3+MMpqu7pqE5HT6WsTSlce++GQmK5UXS8mzV8DSYHrQH+Xrf2jVcuKNg==", + "license": "MIT" + }, "node_modules/universalify": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/universalify/-/universalify-2.0.0.tgz", diff --git a/package.json b/package.json index c3b8763..4d7574b 100644 --- a/package.json +++ b/package.json @@ -36,13 +36,13 @@ "homepage": "https://github.com/hirosystems/api-toolkit#readme", "prettier": "@stacks/prettier-config", "engines": { - "node": ">=18" + "node": ">=22" }, "devDependencies": { "@commitlint/cli": "^17.5.0", "@commitlint/config-conventional": "^17.4.4", "@stacks/eslint-config": "^1.2.0", - "@types/jest": "^29.5.0", + "@types/jest": "^29.5.14", "@typescript-eslint/eslint-plugin": "^5.56.0", "@typescript-eslint/parser": "^5.56.0", "babel-jest": "^29.5.0", @@ -51,18 +51,19 @@ "eslint-plugin-prettier": "^4.2.1", "eslint-plugin-tsdoc": "^0.2.17", "husky": "^8.0.3", - "jest": "^29.5.0", + "jest": "^29.7.0", "prettier": "^2.8.6", "rimraf": "^4.4.1", - "ts-jest": "^29.0.5", - "ts-node": "^10.9.1", - "typescript": "^5.0.2" + "ts-jest": "^29.3.1", + "ts-node": "^10.9.2", + "typescript": "^5.8.2" }, "dependencies": { "@fastify/cors": "^8.0.0", "@fastify/swagger": "^8.3.1", "@fastify/type-provider-typebox": "^3.2.0", "@sinclair/typebox": "^0.28.20", + "@types/node": "^22.13.17", "fastify": "^4.3.0", "fastify-metrics": "^10.2.0", "node-pg-migrate": "^6.2.2", diff --git a/src/helpers/__tests__/my-worker-errors.ts b/src/helpers/__tests__/my-worker-errors.ts new file mode 100644 index 0000000..128f29e --- /dev/null +++ b/src/helpers/__tests__/my-worker-errors.ts @@ -0,0 +1,6 @@ +export class MyCustomError extends Error { + constructor(message?: string) { + super(message); + this.name = this.constructor.name; + } +} diff --git a/src/helpers/__tests__/my-worker-export-default.ts b/src/helpers/__tests__/my-worker-export-default.ts new file mode 100644 index 0000000..9438fb6 --- /dev/null +++ b/src/helpers/__tests__/my-worker-export-default.ts @@ -0,0 +1,15 @@ +import { MyCustomError } from './my-worker-errors'; + +function processTask(req: number) { + if (req === 555) { + const error = new MyCustomError(`Error at req`); + Object.assign(error, { code: 123, deep: { foo: 'bar', baz: 123 } }); + throw error; + } + return req.toString(); +} + +export default { + workerModule: module, + processTask, +}; diff --git a/src/helpers/__tests__/my-worker-export-inline.ts b/src/helpers/__tests__/my-worker-export-inline.ts new file mode 100644 index 0000000..ac2b4ba --- /dev/null +++ b/src/helpers/__tests__/my-worker-export-inline.ts @@ -0,0 +1,12 @@ +import { MyCustomError } from './my-worker-errors'; + +export function processTask(req: number) { + if (req === 555) { + const error = new MyCustomError(`Error at req`); + Object.assign(error, { code: 123, deep: { foo: 'bar', baz: 123 } }); + throw error; + } + return req.toString(); +} + +export const workerModule = module; diff --git a/src/helpers/__tests__/worker.test.ts b/src/helpers/__tests__/worker.test.ts new file mode 100644 index 0000000..60998dd --- /dev/null +++ b/src/helpers/__tests__/worker.test.ts @@ -0,0 +1,82 @@ +import { addKnownErrorConstructor } from '../serialize-error'; +import { WorkerManager } from '../worker-threads'; +import { MyCustomError } from './my-worker-errors'; +import workerModule from './my-worker-export-default'; +import * as starWorkerModule from './my-worker-export-inline'; + +describe('Worker tests - default import', () => { + beforeAll(() => { + addKnownErrorConstructor(MyCustomError); + }); + + test('worker debugging', async () => { + // worker module as a default import + const workerManager1 = await WorkerManager.init(workerModule); + const results = await Promise.all( + Array.from({ length: 10 }, (_, i) => { + return workerManager1.exec(i); + }) + ).finally(() => void workerManager1.close()); + expect(results).toEqual(Array.from({ length: 10 }, (_, i) => i.toString())); + + // worker module as a star import + const workerManager2 = await WorkerManager.init(starWorkerModule); + const results2 = await Promise.all( + Array.from({ length: 10 }, (_, i) => { + return workerManager2.exec(i); + }) + ).finally(() => void workerManager2.close()); + expect(results2).toEqual(Array.from({ length: 10 }, (_, i) => i.toString())); + + // Ensure running the worker directly has the same results + const resultsDirect = Array.from({ length: 10 }, (_, i) => workerModule.processTask(i)); + expect(resultsDirect).toEqual(Array.from({ length: 10 }, (_, i) => i.toString())); + }, 30_000); + + test('worker error deser', async () => { + const workerManager1 = await WorkerManager.init(workerModule); + // job req of 555 throws an error + try { + await workerManager1.exec(555); + throw new Error('Should have thrown'); + } catch (error) { + expect(error).toBeInstanceOf(MyCustomError); + expect(error).toMatchObject({ + name: 'MyCustomError', + code: 123, + deep: { foo: 'bar', baz: 123 }, + }); + } finally { + await workerManager1.close(); + } + + // worker module as a star import + const workerManager2 = await WorkerManager.init(starWorkerModule); + try { + await workerManager2.exec(555); + throw new Error('Should have thrown'); + } catch (error) { + expect(error).toBeInstanceOf(MyCustomError); + expect(error).toMatchObject({ + name: 'MyCustomError', + code: 123, + deep: { foo: 'bar', baz: 123 }, + }); + } finally { + await workerManager2.close(); + } + + // Ensure running the worker directly has the same results + try { + workerModule.processTask(555); + throw new Error('Should have thrown'); + } catch (error) { + expect(error).toBeInstanceOf(MyCustomError); + expect(error).toMatchObject({ + name: 'MyCustomError', + code: 123, + deep: { foo: 'bar', baz: 123 }, + }); + } + }, 30_000); +}); diff --git a/src/helpers/serialize-error.ts b/src/helpers/serialize-error.ts new file mode 100644 index 0000000..1df760c --- /dev/null +++ b/src/helpers/serialize-error.ts @@ -0,0 +1,163 @@ +/* eslint-disable @typescript-eslint/no-unsafe-member-access */ +/* eslint-disable @typescript-eslint/no-explicit-any */ +/* eslint-disable @typescript-eslint/no-unsafe-assignment */ + +const errorConstructors = new Map( + [ + // Native ES errors https://262.ecma-international.org/12.0/#sec-well-known-intrinsic-objects + Error, + EvalError, + RangeError, + ReferenceError, + SyntaxError, + TypeError, + URIError, + AggregateError, + + // Built-in errors + globalThis.DOMException, + + // Node-specific errors https://nodejs.org/api/errors.html + (globalThis as any).AssertionError as Error, + (globalThis as any).SystemError as Error, + ] + // Non-native Errors are used with `globalThis` because they might be missing. This filter drops them when undefined. + .filter(Boolean) + .map(constructor => [constructor.name, constructor as ErrorConstructor] as const) +); + +/** + * Custom errors can only be deserialized correctly if they are registered here. + */ +export function addKnownErrorConstructor( + constructor: new (message?: string, ..._arguments: unknown[]) => Error +) { + try { + new constructor(); + } catch (error) { + throw new Error(`The error constructor "${constructor.name}" is not compatible`, { + cause: error, + }); + } + + errorConstructors.set(constructor.name, constructor as ErrorConstructor); +} + +const commonProperties: { + name: string; + descriptor: Partial; + deserialize?: (_: any) => any; + serialize?: (_: any) => any; +}[] = [ + { + name: 'message', + descriptor: { + enumerable: false, + configurable: true, + writable: true, + }, + }, + { + name: 'stack', + descriptor: { + enumerable: false, + configurable: true, + writable: true, + }, + }, + { + name: 'code', + descriptor: { + enumerable: true, + configurable: true, + writable: true, + }, + }, + { + name: 'cause', + descriptor: { + enumerable: false, + configurable: true, + writable: true, + }, + }, + { + name: 'errors', + descriptor: { + enumerable: false, + configurable: true, + writable: true, + }, + deserialize: (errors: SerializedError[]) => errors.map(error => deserializeError(error)), + serialize: (errors: Error[]) => errors.map(error => serializeError(error)), + }, +]; + +export type SerializedError = { + name: string; + message: string; + stack: string; + code?: string | number; + cause?: string; + [key: string]: any; +}; + +export function serializeError(subject: Error): SerializedError { + const data: Record = { + name: 'Error', + message: '', + stack: '', + }; + + for (const prop of commonProperties) { + if (!(prop.name in subject)) { + continue; + } + let value = (subject as any)[prop.name]; + if (prop.serialize) { + value = prop.serialize(value); + } + data[prop.name] = value; + } + + // Include any other enumerable own properties + for (const key of Object.keys(subject)) { + if (!(key in data)) { + data[key] = (subject as any)[key]; + } + } + + if (globalThis.DOMException && subject instanceof globalThis.DOMException) { + data.name = 'DOMException'; + } else { + data.name = subject.constructor.name; + } + return data as SerializedError; +} + +export function deserializeError(subject: SerializedError): Error { + const con = errorConstructors.get(subject.name) ?? Error; + const output = Object.create(con.prototype) as Error; + + for (const prop of commonProperties) { + if (!(prop.name in subject)) continue; + + let value = subject[prop.name]; + if (prop.deserialize) value = prop.deserialize(value); + + Object.defineProperty(output, prop.name, { + ...prop.descriptor, + value: value, + }); + } + + // Add any other properties (custom props not in commonProperties) + for (const key of Object.keys(subject)) { + if (!commonProperties.some(p => p.name === key)) { + (output as any)[key] = subject[key]; + Object.assign(output, { [key]: subject[key] }); + } + } + + return output; +} diff --git a/src/helpers/worker-threads.ts b/src/helpers/worker-threads.ts new file mode 100644 index 0000000..fb43238 --- /dev/null +++ b/src/helpers/worker-threads.ts @@ -0,0 +1,230 @@ +/* eslint-disable @typescript-eslint/no-non-null-assertion */ +import { isMainThread, parentPort, workerData, Worker, WorkerOptions } from 'node:worker_threads'; +import { cpus } from 'node:os'; +import { EventEmitter } from 'node:events'; +import { deserializeError, SerializedError, serializeError } from './serialize-error'; +import { waiter, Waiter } from './time'; + +type WorkerPoolModuleInterface = + | { + workerModule: NodeJS.Module; + processTask: (req: TReq) => Promise | TResp; + } + | { + default: { + workerModule: NodeJS.Module; + processTask: (req: TReq) => Promise | TResp; + }; + }; + +type WorkerDataInterface = { + workerFile: string; +}; + +type WorkerReqMsg = { + msgId: number; + req: TReq; +}; + +type WorkerRespMsg = { + msgId: number; +} & ( + | { + resp: TResp; + error?: null; + } + | { + resp?: null; + error: TErr; + } +); + +/** + * Invokes a function that may return a value or a promise, and passes the result + * to a callback in a consistent format. Handles both synchronous and asynchronous cases, + * ensuring type safety and avoiding unnecessary async transitions for sync functions. + */ +function getMaybePromiseResult( + fn: () => T | Promise, + cb: (result: { ok: T; err?: null } | { ok?: null; err: unknown }) => void +): void { + try { + const maybePromise = fn(); + if (maybePromise instanceof Promise) { + maybePromise.then( + ok => cb({ ok }), + (err: unknown) => cb({ err }) + ); + } else { + cb({ ok: maybePromise }); + } + } catch (err: unknown) { + cb({ err }); + } +} + +export class WorkerManager { + private readonly workers = new Set(); + private readonly idleWorkers: Worker[] = []; + + private readonly jobQueue: WorkerReqMsg[] = []; + private readonly msgRequests: Map> = new Map(); + private lastMsgId = 0; + + private readonly workerCount: number; + private readonly workerFile: string; + + private readonly events = new EventEmitter<{ + workersReady: []; + }>(); + + public static init( + workerModule: WorkerPoolModuleInterface, + opts: { workerCount?: number } = {} + ) { + const workerManager = new WorkerManager(workerModule, opts); + return new Promise>(resolve => { + workerManager.events.once('workersReady', () => { + resolve(workerManager); + }); + }); + } + + constructor( + workerModule: WorkerPoolModuleInterface, + opts: { workerCount?: number } = {} + ) { + if (!isMainThread) { + throw new Error(`${this.constructor.name} must be instantiated in the main thread`); + } + + if ('default' in workerModule) { + this.workerFile = workerModule.default.workerModule.filename; + } else { + this.workerFile = workerModule.workerModule.filename; + } + this.workerCount = opts.workerCount ?? cpus().length; + this.createWorkerPool(); + } + + exec(req: TReq): Promise { + if (this.lastMsgId >= Number.MAX_SAFE_INTEGER) { + this.lastMsgId = 0; + } + const msgId = this.lastMsgId++; + const replyWaiter = waiter(); + this.msgRequests.set(msgId, replyWaiter); + const reqMsg: WorkerReqMsg = { + msgId, + req, + }; + this.jobQueue.push(reqMsg); + this.assignJobs(); + return replyWaiter; + } + + private createWorkerPool() { + let workersReady = 0; + for (let i = 0; i < this.workerCount; i++) { + const workerData: WorkerDataInterface = { + workerFile: this.workerFile, + }; + const workerOpt: WorkerOptions = { + workerData, + }; + if (__filename.endsWith('.ts')) { + if (process.env.NODE_ENV !== 'test') { + console.error( + 'Worker threads are being created with ts-node outside of a test environment.' + ); + } + workerOpt.execArgv = ['-r', 'ts-node/register']; + } + const worker = new Worker(__filename, workerOpt); + worker.unref(); + this.workers.add(worker); + worker.on('error', err => { + console.error(`Worker error`, err); + }); + worker.on('messageerror', err => { + console.error(`Worker message error`, err); + }); + worker.on('message', (message: unknown) => { + if (message === 'ready') { + this.idleWorkers.push(worker); + this.assignJobs(); + workersReady++; + if (workersReady === this.workerCount) { + this.events.emit('workersReady'); + } + } else { + this.idleWorkers.push(worker); + this.assignJobs(); + const msg = message as WorkerRespMsg; + const replyWaiter = this.msgRequests.get(msg.msgId); + if (replyWaiter) { + if (msg.error) { + replyWaiter.reject(deserializeError(msg.error)); + } else if (msg.resp) { + replyWaiter.resolve(msg.resp); + } + this.msgRequests.delete(msg.msgId); + } else { + console.error('Received unexpected message from worker', msg); + } + } + }); + } + } + + private assignJobs() { + while (this.idleWorkers.length > 0 && this.jobQueue.length > 0) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const worker = this.idleWorkers.shift()!; + const job = this.jobQueue.shift(); + worker.postMessage(job); + } + } + + async close() { + await Promise.all([...this.workers].map(worker => worker.terminate())); + this.workers.clear(); + } +} + +if (!isMainThread && (workerData as WorkerDataInterface)?.workerFile) { + const { workerFile } = workerData as WorkerDataInterface; + // eslint-disable-next-line @typescript-eslint/no-var-requires + const workerModule = require(workerFile) as WorkerPoolModuleInterface; + let processTask: (req: unknown) => unknown; + if ('default' in workerModule) { + processTask = workerModule.default.processTask; + } else { + processTask = workerModule.processTask; + } + parentPort!.on('messageerror', err => { + console.error(`Worker thread message error`, err); + }); + parentPort!.on('message', (message: unknown) => { + const msg = message as WorkerReqMsg; + getMaybePromiseResult( + () => processTask(msg.req), + result => { + if (result.ok) { + const reply: WorkerRespMsg = { + msgId: msg.msgId, + resp: result.ok, + }; + parentPort!.postMessage(reply); + } else { + const reply: WorkerRespMsg = { + msgId: msg.msgId, + error: serializeError(result.err as Error), + }; + parentPort!.postMessage(reply); + } + } + ); + }); + parentPort!.postMessage('ready'); +} diff --git a/tsconfig.json b/tsconfig.json index e82cef2..e303ec8 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,9 +1,8 @@ { "compilerOptions": { - "target": "es2021", - "lib": [ "es2021" ], - "module": "commonjs", - "moduleResolution": "node", + "target": "es2024", + "module": "node16", + "moduleResolution": "node16", "typeRoots": [ "./node_modules/@types" ], From d92a59623dcfbcc8ab17533bd998b2d8263608c8 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Fri, 11 Apr 2025 18:15:25 +0700 Subject: [PATCH 02/11] feat: update WorkerManager with changes from SNP repo --- src/helpers/__tests__/my-worker-errors.ts | 6 - .../__tests__/my-worker-export-default.ts | 16 +- .../__tests__/my-worker-export-inline.ts | 12 -- src/helpers/__tests__/my-worker.ts | 43 ++++ src/helpers/__tests__/worker.test.ts | 190 +++++++++++------- src/helpers/serialize-error.ts | 82 +++++++- src/helpers/worker-threads.ts | 188 +++++++++-------- 7 files changed, 345 insertions(+), 192 deletions(-) delete mode 100644 src/helpers/__tests__/my-worker-errors.ts delete mode 100644 src/helpers/__tests__/my-worker-export-inline.ts create mode 100644 src/helpers/__tests__/my-worker.ts diff --git a/src/helpers/__tests__/my-worker-errors.ts b/src/helpers/__tests__/my-worker-errors.ts deleted file mode 100644 index 128f29e..0000000 --- a/src/helpers/__tests__/my-worker-errors.ts +++ /dev/null @@ -1,6 +0,0 @@ -export class MyCustomError extends Error { - constructor(message?: string) { - super(message); - this.name = this.constructor.name; - } -} diff --git a/src/helpers/__tests__/my-worker-export-default.ts b/src/helpers/__tests__/my-worker-export-default.ts index 9438fb6..ced8800 100644 --- a/src/helpers/__tests__/my-worker-export-default.ts +++ b/src/helpers/__tests__/my-worker-export-default.ts @@ -1,15 +1,5 @@ -import { MyCustomError } from './my-worker-errors'; - -function processTask(req: number) { - if (req === 555) { - const error = new MyCustomError(`Error at req`); - Object.assign(error, { code: 123, deep: { foo: 'bar', baz: 123 } }); - throw error; - } - return req.toString(); -} - +import * as myWorker from './my-worker'; export default { - workerModule: module, - processTask, + workerModule: myWorker.workerModule, + processTask: myWorker.processTask, }; diff --git a/src/helpers/__tests__/my-worker-export-inline.ts b/src/helpers/__tests__/my-worker-export-inline.ts deleted file mode 100644 index ac2b4ba..0000000 --- a/src/helpers/__tests__/my-worker-export-inline.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { MyCustomError } from './my-worker-errors'; - -export function processTask(req: number) { - if (req === 555) { - const error = new MyCustomError(`Error at req`); - Object.assign(error, { code: 123, deep: { foo: 'bar', baz: 123 } }); - throw error; - } - return req.toString(); -} - -export const workerModule = module; diff --git a/src/helpers/__tests__/my-worker.ts b/src/helpers/__tests__/my-worker.ts new file mode 100644 index 0000000..bd4ca6c --- /dev/null +++ b/src/helpers/__tests__/my-worker.ts @@ -0,0 +1,43 @@ +/** Block the thread for `ms` milliseconds */ +function sleepSync(ms: number) { + const int32 = new Int32Array(new SharedArrayBuffer(4)); + Atomics.wait(int32, 0, 0, ms); +} + +export function processTask(req: number, cpuWaitTimeMs: number) { + if (req === 2222) { + throw createError(); + } + if (req === 3333) { + throw 'boom'; + } + sleepSync(cpuWaitTimeMs); + return req.toString(); +} + +export class MyCustomError extends Error { + constructor(message?: string) { + super(message); + this.name = this.constructor.name; + } +} + +function createError() { + const error = new MyCustomError(`Error at req`); + Object.assign(error, { code: 123 }); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (error as any).randoProp = { + foo: 'bar', + baz: 123, + aggregate: [ + Object.assign(new Error('Error in aggregate 1'), { inner1code: 123 }), + new MyCustomError('Error in aggregate 2'), + ], + sourceError: Object.assign(new MyCustomError('Source error'), { + sourceErrorInfo: { code: 44 }, + }), + }; + return error; +} + +export const workerModule = module; diff --git a/src/helpers/__tests__/worker.test.ts b/src/helpers/__tests__/worker.test.ts index 60998dd..8f8fda4 100644 --- a/src/helpers/__tests__/worker.test.ts +++ b/src/helpers/__tests__/worker.test.ts @@ -1,82 +1,134 @@ -import { addKnownErrorConstructor } from '../serialize-error'; +import * as assert from 'node:assert/strict'; +import * as os from 'node:os'; import { WorkerManager } from '../worker-threads'; -import { MyCustomError } from './my-worker-errors'; -import workerModule from './my-worker-export-default'; -import * as starWorkerModule from './my-worker-export-inline'; +import * as workerModule from './my-worker'; +import workerModuleDefaultExport from './my-worker-export-default'; +import { MyCustomError } from './my-worker'; +import { addKnownErrorConstructor } from '../serialize-error'; +import { stopwatch } from '../time'; + +test('worker module with default exports', async () => { + const workerManager = await WorkerManager.init(workerModuleDefaultExport, { workerCount: 2 }); + const res = await workerManager.exec(1, 1); + expect(res).toBe('1'); + await workerManager.close(); +}); + +describe('Worker tests', () => { + let workerManager: Awaited>; + const workerCount = Math.min(4, os.cpus().length); + const cpuPeggedTimeMs = 500; -describe('Worker tests - default import', () => { - beforeAll(() => { + function initWorkerManager() { + return WorkerManager.init(workerModule, { workerCount }); + } + + beforeAll(async () => { addKnownErrorConstructor(MyCustomError); + console.time('worker manager init'); + const manager = await initWorkerManager(); + console.timeEnd('worker manager init'); + workerManager = manager; }); - test('worker debugging', async () => { - // worker module as a default import - const workerManager1 = await WorkerManager.init(workerModule); - const results = await Promise.all( - Array.from({ length: 10 }, (_, i) => { - return workerManager1.exec(i); - }) - ).finally(() => void workerManager1.close()); - expect(results).toEqual(Array.from({ length: 10 }, (_, i) => i.toString())); + afterAll(async () => { + await workerManager.close(); + }); - // worker module as a star import - const workerManager2 = await WorkerManager.init(starWorkerModule); - const results2 = await Promise.all( - Array.from({ length: 10 }, (_, i) => { - return workerManager2.exec(i); - }) - ).finally(() => void workerManager2.close()); - expect(results2).toEqual(Array.from({ length: 10 }, (_, i) => i.toString())); + test('run tasks with workers', async () => { + const watch = stopwatch(); + const taskPromises = Array.from({ length: workerCount }, async (_, i) => { + console.time(`task ${i}`); + const res = await workerManager.exec(i, cpuPeggedTimeMs); + console.timeEnd(`task ${i}`); + return res; + }); - // Ensure running the worker directly has the same results - const resultsDirect = Array.from({ length: 10 }, (_, i) => workerModule.processTask(i)); - expect(resultsDirect).toEqual(Array.from({ length: 10 }, (_, i) => i.toString())); - }, 30_000); + // Ensure all workers were assigned a task + expect(workerManager.busyWorkerCount).toBe(workerCount); + expect(workerManager.idleWorkerCount).toBe(0); - test('worker error deser', async () => { - const workerManager1 = await WorkerManager.init(workerModule); - // job req of 555 throws an error - try { - await workerManager1.exec(555); - throw new Error('Should have thrown'); - } catch (error) { - expect(error).toBeInstanceOf(MyCustomError); - expect(error).toMatchObject({ - name: 'MyCustomError', - code: 123, - deep: { foo: 'bar', baz: 123 }, - }); - } finally { - await workerManager1.close(); - } + const results = await Promise.allSettled(taskPromises); - // worker module as a star import - const workerManager2 = await WorkerManager.init(starWorkerModule); - try { - await workerManager2.exec(555); - throw new Error('Should have thrown'); - } catch (error) { - expect(error).toBeInstanceOf(MyCustomError); - expect(error).toMatchObject({ - name: 'MyCustomError', - code: 123, - deep: { foo: 'bar', baz: 123 }, - }); - } finally { - await workerManager2.close(); + // All tasks should complete roughly within the time in takes for one task to complete + // because the tasks are run in parallel on different threads. + expect(watch.getElapsed()).toBeLessThan(cpuPeggedTimeMs * 1.75); + + // Ensure tasks returned in expected order: + for (let i = 0; i < workerCount; i++) { + const result = results[i]; + assert(result.status === 'fulfilled'); + expect(result.value).toBe(i.toString()); } + }); + + test('worker task throws with non-Error value', async () => { + const [res] = await Promise.allSettled([ + // The worker will throw a non-error value when it receives this specific req value + workerManager.exec(3333, 1), + ]); + assert(res.status === 'rejected'); + expect(res.reason).toBe('boom'); + }); + + test('worker task throws error', async () => { + // Test that error de/ser across worker thread boundary works as expected + const [res] = await Promise.allSettled([ + // The worker will throw an error when it receives this specific req value + workerManager.exec(2222, 1), + ]); + assert(res.status === 'rejected'); + expect(res.reason).toBeInstanceOf(MyCustomError); + expect(res.reason).toMatchObject({ + name: 'MyCustomError', + message: 'Error at req', + code: 123, + stack: expect.any(String), + randoProp: { + foo: 'bar', + baz: 123, + aggregate: [ + { + name: 'Error', + message: 'Error in aggregate 1', + inner1code: 123, + stack: expect.any(String), + }, + { + name: 'MyCustomError', + message: 'Error in aggregate 2', + stack: expect.any(String), + }, + ], + sourceError: { + name: 'MyCustomError', + message: 'Source error', + sourceErrorInfo: { + code: 44, + }, + stack: expect.any(String), + }, + }, + }); + }); + + test('run tasks on main thread', async () => { + const watch = stopwatch(); + const results = await Promise.allSettled( + Array.from({ length: workerCount }, (_, i) => { + return Promise.resolve().then(() => workerModule.processTask(i, cpuPeggedTimeMs)); + }) + ); - // Ensure running the worker directly has the same results - try { - workerModule.processTask(555); - throw new Error('Should have thrown'); - } catch (error) { - expect(error).toBeInstanceOf(MyCustomError); - expect(error).toMatchObject({ - name: 'MyCustomError', - code: 123, - deep: { foo: 'bar', baz: 123 }, - }); + // All tasks should take at least as long as taskCount * cpuPeggedTimeMs because + // they are run synchronously on the main thread. + expect(watch.getElapsed()).toBeGreaterThanOrEqual(workerCount * cpuPeggedTimeMs); + + // Ensure tasks returned in expected order: + for (let i = 0; i < workerCount; i++) { + const result = results[i]; + assert(result.status === 'fulfilled'); + expect(result.value).toBe(i.toString()); } - }, 30_000); + }); }); diff --git a/src/helpers/serialize-error.ts b/src/helpers/serialize-error.ts index 1df760c..f9b38ca 100644 --- a/src/helpers/serialize-error.ts +++ b/src/helpers/serialize-error.ts @@ -97,12 +97,29 @@ export type SerializedError = { name: string; message: string; stack: string; - code?: string | number; - cause?: string; [key: string]: any; }; +export function isErrorLike(value: unknown): value is Error & { stack: string } { + return ( + typeof value === 'object' && + value !== null && + 'name' in value && + 'message' in value && + 'stack' in value && + typeof (value as Error).name === 'string' && + typeof (value as Error).message === 'string' && + typeof (value as Error).stack === 'string' + ); +} + export function serializeError(subject: Error): SerializedError { + if (!isErrorLike(subject)) { + // If the subject is not an error, for example `throw "boom", then we throw. + // This function should only be passed error objects, callers can use `isErrorLike`. + throw new TypeError('Failed to serialize error, expected an error object'); + } + const data: Record = { name: 'Error', message: '', @@ -114,8 +131,11 @@ export function serializeError(subject: Error): SerializedError { continue; } let value = (subject as any)[prop.name]; + // TODO: if value instanceof Error then recursively serializeError if (prop.serialize) { value = prop.serialize(value); + } else { + value = deepSerialize(value); } data[prop.name] = value; } @@ -123,7 +143,7 @@ export function serializeError(subject: Error): SerializedError { // Include any other enumerable own properties for (const key of Object.keys(subject)) { if (!(key in data)) { - data[key] = (subject as any)[key]; + data[key] = deepSerialize((subject as any)[key]); } } @@ -136,14 +156,31 @@ export function serializeError(subject: Error): SerializedError { } export function deserializeError(subject: SerializedError): Error { - const con = errorConstructors.get(subject.name) ?? Error; + if (!isErrorLike(subject)) { + // If the subject is not an error, for example `throw "boom", then we throw. + // This function should only be passed error objects, callers can use `isErrorLike`. + throw new TypeError('Failed to desserialize error, expected an error object'); + } + + let con = errorConstructors.get(subject.name); + if (!con) { + // If the constructor is not found, use the generic Error constructor + con = Error; + console.error( + `Error constructor "${subject.name}" not found during worker error deserialization, using generic Error constructor` + ); + } const output = Object.create(con.prototype) as Error; for (const prop of commonProperties) { if (!(prop.name in subject)) continue; - let value = subject[prop.name]; - if (prop.deserialize) value = prop.deserialize(value); + let value = (subject as any)[prop.name]; + if (prop.deserialize) { + value = prop.deserialize(value); + } else { + value = deepDeserialize(value); + } Object.defineProperty(output, prop.name, { ...prop.descriptor, @@ -154,10 +191,39 @@ export function deserializeError(subject: SerializedError): Error { // Add any other properties (custom props not in commonProperties) for (const key of Object.keys(subject)) { if (!commonProperties.some(p => p.name === key)) { - (output as any)[key] = subject[key]; - Object.assign(output, { [key]: subject[key] }); + (output as any)[key] = deepDeserialize((subject as any)[key]); } } return output; } + +function deepSerialize(value: unknown): unknown { + if (Array.isArray(value)) { + return value.map(deepSerialize); + } else if (isErrorLike(value)) { + return serializeError(value); + } else if (value && typeof value === 'object') { + const result: Record = {}; + for (const [k, v] of Object.entries(value as Record)) { + result[k] = deepSerialize(v); + } + return result; + } + return value; +} + +function deepDeserialize(value: unknown): unknown { + if (Array.isArray(value)) { + return value.map(deepDeserialize); + } else if (isErrorLike(value)) { + return deserializeError(value); + } else if (value && typeof value === 'object') { + const result: Record = {}; + for (const [k, v] of Object.entries(value as Record)) { + result[k] = deepDeserialize(v); + } + return result; + } + return value; +} diff --git a/src/helpers/worker-threads.ts b/src/helpers/worker-threads.ts index fb43238..90ade3b 100644 --- a/src/helpers/worker-threads.ts +++ b/src/helpers/worker-threads.ts @@ -1,19 +1,19 @@ -/* eslint-disable @typescript-eslint/no-non-null-assertion */ -import { isMainThread, parentPort, workerData, Worker, WorkerOptions } from 'node:worker_threads'; -import { cpus } from 'node:os'; +import * as WorkerThreads from 'node:worker_threads'; +import * as os from 'node:os'; +import * as path from 'node:path'; import { EventEmitter } from 'node:events'; -import { deserializeError, SerializedError, serializeError } from './serialize-error'; import { waiter, Waiter } from './time'; +import { deserializeError, isErrorLike, serializeError } from './serialize-error'; -type WorkerPoolModuleInterface = +type WorkerPoolModuleInterface = | { workerModule: NodeJS.Module; - processTask: (req: TReq) => Promise | TResp; + processTask: (...args: TArgs) => Promise | TResp; } | { default: { workerModule: NodeJS.Module; - processTask: (req: TReq) => Promise | TResp; + processTask: (...args: TArgs) => Promise | TResp; }; }; @@ -21,12 +21,12 @@ type WorkerDataInterface = { workerFile: string; }; -type WorkerReqMsg = { +type WorkerReqMsg = { msgId: number; - req: TReq; + req: TArgs; }; -type WorkerRespMsg = { +type WorkerRespMsg = { msgId: number; } & ( | { @@ -62,28 +62,39 @@ function getMaybePromiseResult( cb({ err }); } } +export class WorkerManager { + private readonly workers = new Set(); + private readonly idleWorkers: WorkerThreads.Worker[] = []; -export class WorkerManager { - private readonly workers = new Set(); - private readonly idleWorkers: Worker[] = []; - - private readonly jobQueue: WorkerReqMsg[] = []; + private readonly jobQueue: WorkerReqMsg[] = []; private readonly msgRequests: Map> = new Map(); private lastMsgId = 0; - private readonly workerCount: number; - private readonly workerFile: string; + readonly workerCount: number; + readonly workerFile: string; - private readonly events = new EventEmitter<{ + readonly events = new EventEmitter<{ workersReady: []; }>(); - public static init( - workerModule: WorkerPoolModuleInterface, + get idleWorkerCount() { + return this.idleWorkers.length; + } + + get busyWorkerCount() { + return this.workerCount - this.idleWorkers.length; + } + + get queuedJobCount() { + return this.jobQueue.length; + } + + public static init( + workerModule: WorkerPoolModuleInterface, opts: { workerCount?: number } = {} ) { const workerManager = new WorkerManager(workerModule, opts); - return new Promise>(resolve => { + return new Promise>(resolve => { workerManager.events.once('workersReady', () => { resolve(workerManager); }); @@ -91,10 +102,10 @@ export class WorkerManager { } constructor( - workerModule: WorkerPoolModuleInterface, + workerModule: WorkerPoolModuleInterface, opts: { workerCount?: number } = {} ) { - if (!isMainThread) { + if (!WorkerThreads.isMainThread) { throw new Error(`${this.constructor.name} must be instantiated in the main thread`); } @@ -103,44 +114,44 @@ export class WorkerManager { } else { this.workerFile = workerModule.workerModule.filename; } - this.workerCount = opts.workerCount ?? cpus().length; + this.workerCount = opts.workerCount ?? os.cpus().length; this.createWorkerPool(); } - exec(req: TReq): Promise { + exec(...args: TArgs): Promise { if (this.lastMsgId >= Number.MAX_SAFE_INTEGER) { this.lastMsgId = 0; } const msgId = this.lastMsgId++; const replyWaiter = waiter(); this.msgRequests.set(msgId, replyWaiter); - const reqMsg: WorkerReqMsg = { + const reqMsg: WorkerReqMsg = { msgId, - req, + req: args, }; this.jobQueue.push(reqMsg); this.assignJobs(); return replyWaiter; } - private createWorkerPool() { + createWorkerPool() { let workersReady = 0; for (let i = 0; i < this.workerCount; i++) { const workerData: WorkerDataInterface = { workerFile: this.workerFile, }; - const workerOpt: WorkerOptions = { + const workerOpt: WorkerThreads.WorkerOptions = { workerData, }; - if (__filename.endsWith('.ts')) { + if (path.extname(__filename) === '.ts') { if (process.env.NODE_ENV !== 'test') { - console.error( - 'Worker threads are being created with ts-node outside of a test environment.' + throw new Error( + 'Worker threads are being created with ts-node outside of a test environment' ); } - workerOpt.execArgv = ['-r', 'ts-node/register']; + workerOpt.execArgv = ['-r', 'ts-node/register/transpile-only']; } - const worker = new Worker(__filename, workerOpt); + const worker = new WorkerThreads.Worker(__filename, workerOpt); worker.unref(); this.workers.add(worker); worker.on('error', err => { @@ -149,34 +160,41 @@ export class WorkerManager { worker.on('messageerror', err => { console.error(`Worker message error`, err); }); - worker.on('message', (message: unknown) => { - if (message === 'ready') { - this.idleWorkers.push(worker); - this.assignJobs(); - workersReady++; - if (workersReady === this.workerCount) { - this.events.emit('workersReady'); - } - } else { - this.idleWorkers.push(worker); - this.assignJobs(); - const msg = message as WorkerRespMsg; - const replyWaiter = this.msgRequests.get(msg.msgId); - if (replyWaiter) { - if (msg.error) { - replyWaiter.reject(deserializeError(msg.error)); - } else if (msg.resp) { - replyWaiter.resolve(msg.resp); - } - this.msgRequests.delete(msg.msgId); - } else { - console.error('Received unexpected message from worker', msg); - } + worker.once('message', (message: unknown) => { + if (message !== 'ready') { + throw new Error(`Unexpected first msg from worker thread: ${JSON.stringify(message)}`); + } + this.setupWorkerHandler(worker); + this.idleWorkers.push(worker); + this.assignJobs(); + workersReady++; + if (workersReady === this.workerCount) { + this.events.emit('workersReady'); } }); } } + private setupWorkerHandler(worker: WorkerThreads.Worker) { + worker.on('message', (message: unknown) => { + this.idleWorkers.push(worker); + this.assignJobs(); + const msg = message as WorkerRespMsg; + const replyWaiter = this.msgRequests.get(msg.msgId); + if (replyWaiter) { + if (msg.error) { + const error = isErrorLike(msg.error) ? deserializeError(msg.error) : msg.error; + replyWaiter.reject(error as Error); + } else if (msg.resp) { + replyWaiter.resolve(msg.resp); + } + this.msgRequests.delete(msg.msgId); + } else { + console.error('Received unexpected message from worker', msg); + } + }); + } + private assignJobs() { while (this.idleWorkers.length > 0 && this.jobQueue.length > 0) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -192,39 +210,41 @@ export class WorkerManager { } } -if (!isMainThread && (workerData as WorkerDataInterface)?.workerFile) { - const { workerFile } = workerData as WorkerDataInterface; - // eslint-disable-next-line @typescript-eslint/no-var-requires - const workerModule = require(workerFile) as WorkerPoolModuleInterface; - let processTask: (req: unknown) => unknown; - if ('default' in workerModule) { - processTask = workerModule.default.processTask; - } else { - processTask = workerModule.processTask; - } - parentPort!.on('messageerror', err => { +if (!WorkerThreads.isMainThread && (WorkerThreads.workerData as WorkerDataInterface)?.workerFile) { + const { workerFile } = WorkerThreads.workerData as WorkerDataInterface; + // eslint-disable-next-line @typescript-eslint/no-require-imports, @typescript-eslint/no-var-requires + const workerModule = require(workerFile) as WorkerPoolModuleInterface; + const parentPort = WorkerThreads.parentPort as WorkerThreads.MessagePort; + const processTask = + 'default' in workerModule ? workerModule.default.processTask : workerModule.processTask; + parentPort.on('messageerror', err => { console.error(`Worker thread message error`, err); }); - parentPort!.on('message', (message: unknown) => { - const msg = message as WorkerReqMsg; + parentPort.on('message', (message: unknown) => { + const msg = message as WorkerReqMsg; getMaybePromiseResult( - () => processTask(msg.req), + () => processTask(...msg.req), result => { - if (result.ok) { - const reply: WorkerRespMsg = { - msgId: msg.msgId, - resp: result.ok, - }; - parentPort!.postMessage(reply); - } else { - const reply: WorkerRespMsg = { - msgId: msg.msgId, - error: serializeError(result.err as Error), - }; - parentPort!.postMessage(reply); + try { + let reply: WorkerRespMsg; + if (result.ok) { + reply = { + msgId: msg.msgId, + resp: result.ok, + }; + } else { + const error = isErrorLike(result.err) ? serializeError(result.err) : result.err; + reply = { + msgId: msg.msgId, + error, + }; + } + parentPort.postMessage(reply); + } catch (err: unknown) { + console.error(`Critical bug in work task processing`, err); } } ); }); - parentPort!.postMessage('ready'); + parentPort.postMessage('ready'); } From 944c756d3202e75263d14712cf895833f9b72361 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Mon, 14 Apr 2025 20:20:47 +0700 Subject: [PATCH 03/11] chore: misc cleanup --- src/helpers/__tests__/worker.test.ts | 8 +++++--- src/helpers/index.ts | 1 + .../{worker-threads.ts => worker-thread-manager.ts} | 7 ++++--- tsconfig.json | 1 + 4 files changed, 11 insertions(+), 6 deletions(-) rename src/helpers/{worker-threads.ts => worker-thread-manager.ts} (97%) diff --git a/src/helpers/__tests__/worker.test.ts b/src/helpers/__tests__/worker.test.ts index 8f8fda4..06a2c49 100644 --- a/src/helpers/__tests__/worker.test.ts +++ b/src/helpers/__tests__/worker.test.ts @@ -1,6 +1,6 @@ import * as assert from 'node:assert/strict'; import * as os from 'node:os'; -import { WorkerManager } from '../worker-threads'; +import { WorkerThreadManager } from '../worker-thread-manager'; import * as workerModule from './my-worker'; import workerModuleDefaultExport from './my-worker-export-default'; import { MyCustomError } from './my-worker'; @@ -8,7 +8,9 @@ import { addKnownErrorConstructor } from '../serialize-error'; import { stopwatch } from '../time'; test('worker module with default exports', async () => { - const workerManager = await WorkerManager.init(workerModuleDefaultExport, { workerCount: 2 }); + const workerManager = await WorkerThreadManager.init(workerModuleDefaultExport, { + workerCount: 2, + }); const res = await workerManager.exec(1, 1); expect(res).toBe('1'); await workerManager.close(); @@ -20,7 +22,7 @@ describe('Worker tests', () => { const cpuPeggedTimeMs = 500; function initWorkerManager() { - return WorkerManager.init(workerModule, { workerCount }); + return WorkerThreadManager.init(workerModule, { workerCount }); } beforeAll(async () => { diff --git a/src/helpers/index.ts b/src/helpers/index.ts index 4fa4ee7..6b0e858 100644 --- a/src/helpers/index.ts +++ b/src/helpers/index.ts @@ -1,3 +1,4 @@ export * from './iterators'; export * from './time'; export * from './values'; +export * from './worker-thread-manager'; diff --git a/src/helpers/worker-threads.ts b/src/helpers/worker-thread-manager.ts similarity index 97% rename from src/helpers/worker-threads.ts rename to src/helpers/worker-thread-manager.ts index 90ade3b..f32a473 100644 --- a/src/helpers/worker-threads.ts +++ b/src/helpers/worker-thread-manager.ts @@ -62,7 +62,8 @@ function getMaybePromiseResult( cb({ err }); } } -export class WorkerManager { + +export class WorkerThreadManager { private readonly workers = new Set(); private readonly idleWorkers: WorkerThreads.Worker[] = []; @@ -93,8 +94,8 @@ export class WorkerManager { workerModule: WorkerPoolModuleInterface, opts: { workerCount?: number } = {} ) { - const workerManager = new WorkerManager(workerModule, opts); - return new Promise>(resolve => { + const workerManager = new WorkerThreadManager(workerModule, opts); + return new Promise>(resolve => { workerManager.events.once('workersReady', () => { resolve(workerManager); }); diff --git a/tsconfig.json b/tsconfig.json index e303ec8..0f066c3 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -6,6 +6,7 @@ "typeRoots": [ "./node_modules/@types" ], + "isolatedModules": true, "declaration": true, "sourceMap": true, "outDir": "./dist", From d7ce8369753ba1617abccef84ed701ffa217e8aa Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Mon, 14 Apr 2025 21:14:42 +0700 Subject: [PATCH 04/11] chore: cleanup worker thread overhead --- src/helpers/index.ts | 3 +- src/helpers/worker-thread-init.ts | 79 +++++++++++++++++++++++ src/helpers/worker-thread-manager.ts | 96 +++++----------------------- 3 files changed, 98 insertions(+), 80 deletions(-) create mode 100644 src/helpers/worker-thread-init.ts diff --git a/src/helpers/index.ts b/src/helpers/index.ts index 6b0e858..e3d91a2 100644 --- a/src/helpers/index.ts +++ b/src/helpers/index.ts @@ -1,4 +1,5 @@ export * from './iterators'; export * from './time'; export * from './values'; -export * from './worker-thread-manager'; +export { WorkerThreadManager } from './worker-thread-manager'; +export type { WorkerPoolModuleInterface } from './worker-thread-manager'; diff --git a/src/helpers/worker-thread-init.ts b/src/helpers/worker-thread-init.ts new file mode 100644 index 0000000..b02b318 --- /dev/null +++ b/src/helpers/worker-thread-init.ts @@ -0,0 +1,79 @@ +import * as WorkerThreads from 'node:worker_threads'; +import type { + WorkerDataInterface, + WorkerPoolModuleInterface, + WorkerReqMsg, + WorkerRespMsg, +} from './worker-thread-manager'; +import { isErrorLike, serializeError } from './serialize-error'; + +// Minimal worker thread initialization code. This file is the entry point for worker threads +// and is responsible for setting up the worker environment and handling messages from the main thread. +// Imports should be kept to a minimum to avoid in worker thread init overhead and memory usage. + +export const filename = __filename; + +/** + * Invokes a function that may return a value or a promise, and passes the result + * to a callback in a consistent format. Handles both synchronous and asynchronous cases, + * ensuring type safety and avoiding unnecessary async transitions for sync functions. + */ +function getMaybePromiseResult( + fn: () => T | Promise, + cb: (result: { ok: T; err?: null } | { ok?: null; err: unknown }) => void +): void { + try { + const maybePromise = fn(); + if (maybePromise instanceof Promise) { + maybePromise.then( + ok => cb({ ok }), + (err: unknown) => cb({ err }) + ); + } else { + cb({ ok: maybePromise }); + } + } catch (err: unknown) { + cb({ err }); + } +} + +// Check if this file is being run in a worker thread. If so, it will set up the worker environment. +if (!WorkerThreads.isMainThread && (WorkerThreads.workerData as WorkerDataInterface)?.workerFile) { + const { workerFile } = WorkerThreads.workerData as WorkerDataInterface; + // eslint-disable-next-line @typescript-eslint/no-require-imports, @typescript-eslint/no-var-requires + const workerModule = require(workerFile) as WorkerPoolModuleInterface; + const parentPort = WorkerThreads.parentPort as WorkerThreads.MessagePort; + // Determine if the worker module `processTask` function is a default export or a named export. + const processTask = + 'default' in workerModule ? workerModule.default.processTask : workerModule.processTask; + parentPort.on('messageerror', err => { + console.error(`Worker thread message error`, err); + }); + parentPort.on('message', (message: unknown) => { + const msg = message as WorkerReqMsg; + getMaybePromiseResult( + () => processTask(...msg.req), + result => { + try { + let reply: WorkerRespMsg; + if (result.ok) { + reply = { + msgId: msg.msgId, + resp: result.ok, + }; + } else { + const error = isErrorLike(result.err) ? serializeError(result.err) : result.err; + reply = { + msgId: msg.msgId, + error, + }; + } + parentPort.postMessage(reply); + } catch (err: unknown) { + console.error(`Critical bug in work task processing`, err); + } + } + ); + }); + parentPort.postMessage('ready'); +} diff --git a/src/helpers/worker-thread-manager.ts b/src/helpers/worker-thread-manager.ts index f32a473..967ddfc 100644 --- a/src/helpers/worker-thread-manager.ts +++ b/src/helpers/worker-thread-manager.ts @@ -3,30 +3,19 @@ import * as os from 'node:os'; import * as path from 'node:path'; import { EventEmitter } from 'node:events'; import { waiter, Waiter } from './time'; -import { deserializeError, isErrorLike, serializeError } from './serialize-error'; +import { deserializeError, isErrorLike } from './serialize-error'; +import { filename as workerThreadInitFilename } from './worker-thread-init'; -type WorkerPoolModuleInterface = - | { - workerModule: NodeJS.Module; - processTask: (...args: TArgs) => Promise | TResp; - } - | { - default: { - workerModule: NodeJS.Module; - processTask: (...args: TArgs) => Promise | TResp; - }; - }; - -type WorkerDataInterface = { +export type WorkerDataInterface = { workerFile: string; }; -type WorkerReqMsg = { +export type WorkerReqMsg = { msgId: number; req: TArgs; }; -type WorkerRespMsg = { +export type WorkerRespMsg = { msgId: number; } & ( | { @@ -39,29 +28,17 @@ type WorkerRespMsg = { } ); -/** - * Invokes a function that may return a value or a promise, and passes the result - * to a callback in a consistent format. Handles both synchronous and asynchronous cases, - * ensuring type safety and avoiding unnecessary async transitions for sync functions. - */ -function getMaybePromiseResult( - fn: () => T | Promise, - cb: (result: { ok: T; err?: null } | { ok?: null; err: unknown }) => void -): void { - try { - const maybePromise = fn(); - if (maybePromise instanceof Promise) { - maybePromise.then( - ok => cb({ ok }), - (err: unknown) => cb({ err }) - ); - } else { - cb({ ok: maybePromise }); +export type WorkerPoolModuleInterface = + | { + workerModule: NodeJS.Module; + processTask: (...args: TArgs) => Promise | TResp; } - } catch (err: unknown) { - cb({ err }); - } -} + | { + default: { + workerModule: NodeJS.Module; + processTask: (...args: TArgs) => Promise | TResp; + }; + }; export class WorkerThreadManager { private readonly workers = new Set(); @@ -144,7 +121,7 @@ export class WorkerThreadManager { const workerOpt: WorkerThreads.WorkerOptions = { workerData, }; - if (path.extname(__filename) === '.ts') { + if (path.extname(workerThreadInitFilename) === '.ts') { if (process.env.NODE_ENV !== 'test') { throw new Error( 'Worker threads are being created with ts-node outside of a test environment' @@ -152,7 +129,7 @@ export class WorkerThreadManager { } workerOpt.execArgv = ['-r', 'ts-node/register/transpile-only']; } - const worker = new WorkerThreads.Worker(__filename, workerOpt); + const worker = new WorkerThreads.Worker(workerThreadInitFilename, workerOpt); worker.unref(); this.workers.add(worker); worker.on('error', err => { @@ -210,42 +187,3 @@ export class WorkerThreadManager { this.workers.clear(); } } - -if (!WorkerThreads.isMainThread && (WorkerThreads.workerData as WorkerDataInterface)?.workerFile) { - const { workerFile } = WorkerThreads.workerData as WorkerDataInterface; - // eslint-disable-next-line @typescript-eslint/no-require-imports, @typescript-eslint/no-var-requires - const workerModule = require(workerFile) as WorkerPoolModuleInterface; - const parentPort = WorkerThreads.parentPort as WorkerThreads.MessagePort; - const processTask = - 'default' in workerModule ? workerModule.default.processTask : workerModule.processTask; - parentPort.on('messageerror', err => { - console.error(`Worker thread message error`, err); - }); - parentPort.on('message', (message: unknown) => { - const msg = message as WorkerReqMsg; - getMaybePromiseResult( - () => processTask(...msg.req), - result => { - try { - let reply: WorkerRespMsg; - if (result.ok) { - reply = { - msgId: msg.msgId, - resp: result.ok, - }; - } else { - const error = isErrorLike(result.err) ? serializeError(result.err) : result.err; - reply = { - msgId: msg.msgId, - error, - }; - } - parentPort.postMessage(reply); - } catch (err: unknown) { - console.error(`Critical bug in work task processing`, err); - } - } - ); - }); - parentPort.postMessage('ready'); -} From 069e434015a6ea61a6238cbf15772272fae64434 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Mon, 14 Apr 2025 23:21:59 +0700 Subject: [PATCH 05/11] chore: add better abort support --- package-lock.json | 16 ++++++++-------- package.json | 2 +- src/helpers/__tests__/my-worker.ts | 3 +-- src/helpers/worker-thread-manager.ts | 13 ++++++++++++- 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/package-lock.json b/package-lock.json index 511c02d..c9febd6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,7 @@ "@fastify/swagger": "^8.3.1", "@fastify/type-provider-typebox": "^3.2.0", "@sinclair/typebox": "^0.28.20", - "@types/node": "^22.13.17", + "@types/node": "^22.14.1", "fastify": "^4.3.0", "fastify-metrics": "^10.2.0", "node-pg-migrate": "^6.2.2", @@ -2833,12 +2833,12 @@ "dev": true }, "node_modules/@types/node": { - "version": "22.13.17", - "resolved": "https://registry.npmjs.org/@types/node/-/node-22.13.17.tgz", - "integrity": "sha512-nAJuQXoyPj04uLgu+obZcSmsfOenUg6DxPKogeUy6yNCFwWaj5sBF8/G/pNo8EtBJjAfSVgfIlugR/BCOleO+g==", + "version": "22.14.1", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.14.1.tgz", + "integrity": "sha512-u0HuPQwe/dHrItgHHpmw3N2fYCR6x4ivMNbPHRkBVP4CvN+kiRrKHWk3i8tXiO/joPwXLMYvF9TTF0eqgHIuOw==", "license": "MIT", "dependencies": { - "undici-types": "~6.20.0" + "undici-types": "~6.21.0" } }, "node_modules/@types/normalize-package-data": { @@ -11024,9 +11024,9 @@ } }, "node_modules/undici-types": { - "version": "6.20.0", - "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.20.0.tgz", - "integrity": "sha512-Ny6QZ2Nju20vw1SRHe3d9jVu6gJ+4e3+MMpqu7pqE5HT6WsTSlce++GQmK5UXS8mzV8DSYHrQH+Xrf2jVcuKNg==", + "version": "6.21.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz", + "integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==", "license": "MIT" }, "node_modules/universalify": { diff --git a/package.json b/package.json index 4d7574b..12714ae 100644 --- a/package.json +++ b/package.json @@ -63,7 +63,7 @@ "@fastify/swagger": "^8.3.1", "@fastify/type-provider-typebox": "^3.2.0", "@sinclair/typebox": "^0.28.20", - "@types/node": "^22.13.17", + "@types/node": "^22.14.1", "fastify": "^4.3.0", "fastify-metrics": "^10.2.0", "node-pg-migrate": "^6.2.2", diff --git a/src/helpers/__tests__/my-worker.ts b/src/helpers/__tests__/my-worker.ts index bd4ca6c..b06afb3 100644 --- a/src/helpers/__tests__/my-worker.ts +++ b/src/helpers/__tests__/my-worker.ts @@ -1,7 +1,6 @@ /** Block the thread for `ms` milliseconds */ function sleepSync(ms: number) { - const int32 = new Int32Array(new SharedArrayBuffer(4)); - Atomics.wait(int32, 0, 0, ms); + Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, ms); } export function processTask(req: number, cpuWaitTimeMs: number) { diff --git a/src/helpers/worker-thread-manager.ts b/src/helpers/worker-thread-manager.ts index 967ddfc..2087dbe 100644 --- a/src/helpers/worker-thread-manager.ts +++ b/src/helpers/worker-thread-manager.ts @@ -1,7 +1,7 @@ import * as WorkerThreads from 'node:worker_threads'; import * as os from 'node:os'; import * as path from 'node:path'; -import { EventEmitter } from 'node:events'; +import { EventEmitter, addAbortListener } from 'node:events'; import { waiter, Waiter } from './time'; import { deserializeError, isErrorLike } from './serialize-error'; import { filename as workerThreadInitFilename } from './worker-thread-init'; @@ -51,6 +51,8 @@ export class WorkerThreadManager { readonly workerCount: number; readonly workerFile: string; + private readonly abortControlller = new AbortController(); + readonly events = new EventEmitter<{ workersReady: []; }>(); @@ -97,6 +99,7 @@ export class WorkerThreadManager { } exec(...args: TArgs): Promise { + this.abortControlller.signal.throwIfAborted(); if (this.lastMsgId >= Number.MAX_SAFE_INTEGER) { this.lastMsgId = 0; } @@ -151,6 +154,13 @@ export class WorkerThreadManager { } }); } + addAbortListener(this.abortControlller.signal, () => { + for (const replyWaiter of this.msgRequests.values()) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + replyWaiter.reject(this.abortControlller.signal.reason); + } + this.msgRequests.clear(); + }); } private setupWorkerHandler(worker: WorkerThreads.Worker) { @@ -183,6 +193,7 @@ export class WorkerThreadManager { } async close() { + this.abortControlller.abort(); await Promise.all([...this.workers].map(worker => worker.terminate())); this.workers.clear(); } From 98a069b386c3f6e19c308115493995111883b5e6 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Thu, 17 Apr 2025 15:56:13 +0700 Subject: [PATCH 06/11] chore: simplify error serialization --- src/helpers/serialize-error.ts | 92 +++++++--------------------------- 1 file changed, 18 insertions(+), 74 deletions(-) diff --git a/src/helpers/serialize-error.ts b/src/helpers/serialize-error.ts index f9b38ca..bacc88c 100644 --- a/src/helpers/serialize-error.ts +++ b/src/helpers/serialize-error.ts @@ -39,58 +39,15 @@ export function addKnownErrorConstructor( cause: error, }); } - errorConstructors.set(constructor.name, constructor as ErrorConstructor); } -const commonProperties: { - name: string; - descriptor: Partial; - deserialize?: (_: any) => any; - serialize?: (_: any) => any; -}[] = [ - { - name: 'message', - descriptor: { - enumerable: false, - configurable: true, - writable: true, - }, - }, - { - name: 'stack', - descriptor: { - enumerable: false, - configurable: true, - writable: true, - }, - }, - { - name: 'code', - descriptor: { - enumerable: true, - configurable: true, - writable: true, - }, - }, - { - name: 'cause', - descriptor: { - enumerable: false, - configurable: true, - writable: true, - }, - }, - { - name: 'errors', - descriptor: { - enumerable: false, - configurable: true, - writable: true, - }, - deserialize: (errors: SerializedError[]) => errors.map(error => deserializeError(error)), - serialize: (errors: Error[]) => errors.map(error => serializeError(error)), - }, +const commonProperties: [name: string, enumerable: boolean][] = [ + ['message', false], + ['stack', false], + ['code', true], + ['cause', false], + ['errors', false], ]; export type SerializedError = { @@ -126,18 +83,10 @@ export function serializeError(subject: Error): SerializedError { stack: '', }; - for (const prop of commonProperties) { - if (!(prop.name in subject)) { - continue; - } - let value = (subject as any)[prop.name]; - // TODO: if value instanceof Error then recursively serializeError - if (prop.serialize) { - value = prop.serialize(value); - } else { - value = deepSerialize(value); + for (const [name] of commonProperties) { + if (name in subject) { + data[name] = deepSerialize((subject as any)[name]); } - data[prop.name] = value; } // Include any other enumerable own properties @@ -172,25 +121,20 @@ export function deserializeError(subject: SerializedError): Error { } const output = Object.create(con.prototype) as Error; - for (const prop of commonProperties) { - if (!(prop.name in subject)) continue; - - let value = (subject as any)[prop.name]; - if (prop.deserialize) { - value = prop.deserialize(value); - } else { - value = deepDeserialize(value); + for (const [name, enumerable] of commonProperties) { + if (name in subject) { + Object.defineProperty(output, name, { + enumerable, + configurable: true, + writable: true, + value: deepDeserialize((subject as any)[name]), + }); } - - Object.defineProperty(output, prop.name, { - ...prop.descriptor, - value: value, - }); } // Add any other properties (custom props not in commonProperties) for (const key of Object.keys(subject)) { - if (!commonProperties.some(p => p.name === key)) { + if (!commonProperties.some(([name]) => name === key)) { (output as any)[key] = deepDeserialize((subject as any)[key]); } } From b21062bc1e75e5532a5259aac53b8b33ce7603de Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Thu, 17 Apr 2025 15:58:29 +0700 Subject: [PATCH 07/11] chore: simplify error serialization --- src/helpers/serialize-error.ts | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/src/helpers/serialize-error.ts b/src/helpers/serialize-error.ts index bacc88c..0eaa1c0 100644 --- a/src/helpers/serialize-error.ts +++ b/src/helpers/serialize-error.ts @@ -143,31 +143,19 @@ export function deserializeError(subject: SerializedError): Error { } function deepSerialize(value: unknown): unknown { - if (Array.isArray(value)) { - return value.map(deepSerialize); - } else if (isErrorLike(value)) { - return serializeError(value); - } else if (value && typeof value === 'object') { - const result: Record = {}; - for (const [k, v] of Object.entries(value as Record)) { - result[k] = deepSerialize(v); - } - return result; + if (Array.isArray(value)) return value.map(deepSerialize); + if (isErrorLike(value)) return serializeError(value); + if (value && typeof value === 'object') { + return Object.fromEntries(Object.entries(value).map(([k, v]) => [k, deepSerialize(v)])); } return value; } function deepDeserialize(value: unknown): unknown { - if (Array.isArray(value)) { - return value.map(deepDeserialize); - } else if (isErrorLike(value)) { - return deserializeError(value); - } else if (value && typeof value === 'object') { - const result: Record = {}; - for (const [k, v] of Object.entries(value as Record)) { - result[k] = deepDeserialize(v); - } - return result; + if (Array.isArray(value)) return value.map(deepDeserialize); + if (isErrorLike(value)) return deserializeError(value); + if (value && typeof value === 'object') { + return Object.fromEntries(Object.entries(value).map(([k, v]) => [k, deepDeserialize(v)])); } return value; } From 27798b944f2f5fe9de9592fd5b09999940927c01 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Thu, 17 Apr 2025 16:04:54 +0700 Subject: [PATCH 08/11] test: ensure AggregateError is serialized correctly --- src/helpers/__tests__/my-worker.ts | 10 ++++++++ src/helpers/__tests__/worker.test.ts | 38 ++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/src/helpers/__tests__/my-worker.ts b/src/helpers/__tests__/my-worker.ts index b06afb3..7cd5234 100644 --- a/src/helpers/__tests__/my-worker.ts +++ b/src/helpers/__tests__/my-worker.ts @@ -10,6 +10,9 @@ export function processTask(req: number, cpuWaitTimeMs: number) { if (req === 3333) { throw 'boom'; } + if (req == 4444) { + throw createAggregateError(); + } sleepSync(cpuWaitTimeMs); return req.toString(); } @@ -39,4 +42,11 @@ function createError() { return error; } +function createAggregateError() { + const error1 = new Error('Error1 in aggregate 1'); + Object.assign(error1, { inner1code: 123 }); + const error2 = new TypeError('Error2 in aggregate 2'); + return new AggregateError([error1, error2], 'My aggregate error message', { cause: 'foo' }); +} + export const workerModule = module; diff --git a/src/helpers/__tests__/worker.test.ts b/src/helpers/__tests__/worker.test.ts index 06a2c49..b985ab2 100644 --- a/src/helpers/__tests__/worker.test.ts +++ b/src/helpers/__tests__/worker.test.ts @@ -114,6 +114,44 @@ describe('Worker tests', () => { }); }); + test('worker task throws with non-Error value', async () => { + const [res] = await Promise.allSettled([ + // The worker will throw a non-error value when it receives this specific req value + workerManager.exec(3333, 1), + ]); + assert(res.status === 'rejected'); + expect(res.reason).toBe('boom'); + }); + + test('worker task throws AggregateError', async () => { + // Test that error de/ser across worker thread boundary works as expected + const [res] = await Promise.allSettled([ + // The worker will throw an error when it receives this specific req value + workerManager.exec(4444, 1), + ]); + assert(res.status === 'rejected'); + expect(res.reason).toBeInstanceOf(AggregateError); + expect(res.reason).toMatchObject({ + name: 'AggregateError', + message: 'My aggregate error message', + stack: expect.any(String), + cause: 'foo', + errors: [ + { + name: 'Error', + message: 'Error1 in aggregate 1', + inner1code: 123, + stack: expect.any(String), + }, + { + name: 'TypeError', + message: 'Error2 in aggregate 2', + stack: expect.any(String), + }, + ], + }); + }); + test('run tasks on main thread', async () => { const watch = stopwatch(); const results = await Promise.allSettled( From 80eb01da7d5ed9f578d30edc94b01a81d9c3fe9d Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Thu, 17 Apr 2025 16:32:21 +0700 Subject: [PATCH 09/11] chore: simplify error serialization --- jest.config.js | 7 +++--- src/helpers/serialize-error.ts | 45 ++++++++++++++-------------------- 2 files changed, 21 insertions(+), 31 deletions(-) diff --git a/jest.config.js b/jest.config.js index 85190fb..e46c1ef 100644 --- a/jest.config.js +++ b/jest.config.js @@ -1,9 +1,8 @@ -/* - * For a detailed explanation regarding each configuration property, visit: - * https://jestjs.io/docs/configuration - */ +// @ts-check +/** @type {import('jest').Config} */ module.exports = { + testTimeout: 10000000, // All imported modules in your tests should be mocked automatically // automock: false, diff --git a/src/helpers/serialize-error.ts b/src/helpers/serialize-error.ts index 0eaa1c0..d2ab4fc 100644 --- a/src/helpers/serialize-error.ts +++ b/src/helpers/serialize-error.ts @@ -42,13 +42,13 @@ export function addKnownErrorConstructor( errorConstructors.set(constructor.name, constructor as ErrorConstructor); } -const commonProperties: [name: string, enumerable: boolean][] = [ - ['message', false], - ['stack', false], - ['code', true], - ['cause', false], - ['errors', false], -]; +const commonProperties: Record = { + message: false, + stack: false, + code: true, + cause: false, + errors: false, +}; export type SerializedError = { name: string; @@ -77,31 +77,22 @@ export function serializeError(subject: Error): SerializedError { throw new TypeError('Failed to serialize error, expected an error object'); } - const data: Record = { - name: 'Error', + const data: SerializedError = { + name: subject instanceof DOMException ? 'DOMException' : subject.constructor.name ?? 'Error', message: '', stack: '', }; - for (const [name] of commonProperties) { - if (name in subject) { - data[name] = deepSerialize((subject as any)[name]); - } + for (const key of Object.keys(commonProperties)) { + if (key in subject) data[key] = deepSerialize((subject as any)[key]); } // Include any other enumerable own properties for (const key of Object.keys(subject)) { - if (!(key in data)) { - data[key] = deepSerialize((subject as any)[key]); - } + if (!(key in data)) data[key] = deepSerialize((subject as any)[key]); } - if (globalThis.DOMException && subject instanceof globalThis.DOMException) { - data.name = 'DOMException'; - } else { - data.name = subject.constructor.name; - } - return data as SerializedError; + return data; } export function deserializeError(subject: SerializedError): Error { @@ -121,20 +112,20 @@ export function deserializeError(subject: SerializedError): Error { } const output = Object.create(con.prototype) as Error; - for (const [name, enumerable] of commonProperties) { - if (name in subject) { - Object.defineProperty(output, name, { + for (const [key, enumerable] of Object.entries(commonProperties)) { + if (key in subject) { + Object.defineProperty(output, key, { enumerable, configurable: true, writable: true, - value: deepDeserialize((subject as any)[name]), + value: deepDeserialize((subject as any)[key]), }); } } // Add any other properties (custom props not in commonProperties) for (const key of Object.keys(subject)) { - if (!commonProperties.some(([name]) => name === key)) { + if (!(key in commonProperties)) { (output as any)[key] = deepDeserialize((subject as any)[key]); } } From ff21c91128e2cdd38d7e9fa2cea81fdbdf90215c Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Thu, 17 Apr 2025 18:02:13 +0700 Subject: [PATCH 10/11] fix: serialize AbortError (DOMException) errors correctly --- jest.config.js | 1 - src/helpers/__tests__/my-worker.ts | 9 +++++++++ src/helpers/__tests__/worker.test.ts | 20 +++++++++++++++++++- src/helpers/serialize-error.ts | 19 ++++++++++++++----- 4 files changed, 42 insertions(+), 7 deletions(-) diff --git a/jest.config.js b/jest.config.js index e46c1ef..06218e7 100644 --- a/jest.config.js +++ b/jest.config.js @@ -2,7 +2,6 @@ /** @type {import('jest').Config} */ module.exports = { - testTimeout: 10000000, // All imported modules in your tests should be mocked automatically // automock: false, diff --git a/src/helpers/__tests__/my-worker.ts b/src/helpers/__tests__/my-worker.ts index 7cd5234..f6493b5 100644 --- a/src/helpers/__tests__/my-worker.ts +++ b/src/helpers/__tests__/my-worker.ts @@ -13,6 +13,9 @@ export function processTask(req: number, cpuWaitTimeMs: number) { if (req == 4444) { throw createAggregateError(); } + if (req === 5555) { + throwDOMError(); + } sleepSync(cpuWaitTimeMs); return req.toString(); } @@ -49,4 +52,10 @@ function createAggregateError() { return new AggregateError([error1, error2], 'My aggregate error message', { cause: 'foo' }); } +function throwDOMError() { + const ac = new AbortController(); + ac.abort(); + ac.signal.throwIfAborted(); +} + export const workerModule = module; diff --git a/src/helpers/__tests__/worker.test.ts b/src/helpers/__tests__/worker.test.ts index b985ab2..63a1b39 100644 --- a/src/helpers/__tests__/worker.test.ts +++ b/src/helpers/__tests__/worker.test.ts @@ -123,7 +123,7 @@ describe('Worker tests', () => { expect(res.reason).toBe('boom'); }); - test('worker task throws AggregateError', async () => { + test('worker task serializes AggregateError', async () => { // Test that error de/ser across worker thread boundary works as expected const [res] = await Promise.allSettled([ // The worker will throw an error when it receives this specific req value @@ -152,6 +152,24 @@ describe('Worker tests', () => { }); }); + test('worker task serializes DOMException (AbortError)', async () => { + // Test that error de/ser across worker thread boundary works as expected + const [res] = await Promise.allSettled([ + // The worker will throw an error when it receives this specific req value + workerManager.exec(5555, 1), + ]); + assert(res.status === 'rejected'); + expect(res.reason).toBeInstanceOf(DOMException); + expect(res.reason).toMatchObject({ + constructor: expect.objectContaining({ + name: 'DOMException', + }), + name: 'AbortError', + message: 'This operation was aborted', + stack: expect.any(String), + }); + }); + test('run tasks on main thread', async () => { const watch = stopwatch(); const results = await Promise.allSettled( diff --git a/src/helpers/serialize-error.ts b/src/helpers/serialize-error.ts index d2ab4fc..c05b000 100644 --- a/src/helpers/serialize-error.ts +++ b/src/helpers/serialize-error.ts @@ -43,6 +43,7 @@ export function addKnownErrorConstructor( } const commonProperties: Record = { + name: false, message: false, stack: false, code: true, @@ -50,14 +51,21 @@ const commonProperties: Record = { errors: false, }; -export type SerializedError = { +type SerializedError = { + constructorName: string; name: string; message: string; stack: string; [key: string]: any; }; -export function isErrorLike(value: unknown): value is Error & { stack: string } { +export type ErrorLike = { + name: string; + message: string; + stack: string; +}; + +export function isErrorLike(value: unknown): value is ErrorLike { return ( typeof value === 'object' && value !== null && @@ -78,7 +86,8 @@ export function serializeError(subject: Error): SerializedError { } const data: SerializedError = { - name: subject instanceof DOMException ? 'DOMException' : subject.constructor.name ?? 'Error', + constructorName: subject.constructor.name ?? 'Error', // new field + name: subject.name, message: '', stack: '', }; @@ -95,14 +104,14 @@ export function serializeError(subject: Error): SerializedError { return data; } -export function deserializeError(subject: SerializedError): Error { +export function deserializeError(subject: ErrorLike): Error { if (!isErrorLike(subject)) { // If the subject is not an error, for example `throw "boom", then we throw. // This function should only be passed error objects, callers can use `isErrorLike`. throw new TypeError('Failed to desserialize error, expected an error object'); } - let con = errorConstructors.get(subject.name); + let con = errorConstructors.get((subject as SerializedError).constructorName ?? subject.name); if (!con) { // If the constructor is not found, use the generic Error constructor con = Error; From ffe4d49c7af6349ac547eb9a308577ffd8514283 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Thu, 17 Apr 2025 18:10:55 +0700 Subject: [PATCH 11/11] test: ensure task queuing works as expected --- src/helpers/__tests__/worker.test.ts | 33 ++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/helpers/__tests__/worker.test.ts b/src/helpers/__tests__/worker.test.ts index 63a1b39..1f626f0 100644 --- a/src/helpers/__tests__/worker.test.ts +++ b/src/helpers/__tests__/worker.test.ts @@ -189,4 +189,37 @@ describe('Worker tests', () => { expect(result.value).toBe(i.toString()); } }); + + test('Run more tasks than CPUs', async () => { + const watch = stopwatch(); + const taskCount = workerManager.workerCount * 3; + const taskTime = 50; + const taskPromises = Array.from({ length: taskCount }, async (_, i) => { + console.time(`task ${i}`); + const res = await workerManager.exec(i, taskTime); + console.timeEnd(`task ${i}`); + return res; + }); + + // Ensure all workers were assigned a task and queue is correct length + expect(workerManager.busyWorkerCount).toBe(workerCount); + expect(workerManager.idleWorkerCount).toBe(0); + expect(workerManager.queuedJobCount).toBe(taskCount - workerCount); + + const results = await Promise.allSettled(taskPromises); + + // All tasks should complete roughly within the time in takes for one task to complete + // because the tasks are run in parallel on different threads. + // (Pad timing with an extra 50% to account for test code execution overhead) + expect(watch.getElapsed()).toBeLessThan( + Math.ceil(taskCount / workerManager.workerCount) * taskTime * 1.5 + ); + + // Ensure tasks returned in expected order: + for (let i = 0; i < taskPromises.length; i++) { + const result = results[i]; + assert(result.status === 'fulfilled'); + expect(result.value).toBe(i.toString()); + } + }); });