From 748e364acf9475950f8ce4c18ce3219e964cca69 Mon Sep 17 00:00:00 2001 From: Quajo Duke Date: Tue, 2 Sep 2025 00:08:15 +0000 Subject: [PATCH 1/5] update: added auto migration runnning on db connect --- .env.test | 4 +- dist/worker.d.ts | 1 + dist/worker.d.ts.map | 2 +- dist/worker.js | 20 ++++++-- dist/worker.js.map | 2 +- docker-compose.yml | 4 +- package-lock.json | 8 +-- src/client.ts | 53 +++++++++++--------- src/utils.ts | 38 +++++++++++++- src/worker.ts | 4 ++ tests/impl/test_client.test.ts | 17 ++++--- tests/impl/test_client.ts | 2 +- tests/setup.ts | 23 +++++---- tests/utils.test.ts | 92 ++++++++++++++++++++++++++-------- tests/worker.test.ts | 6 +++ 15 files changed, 199 insertions(+), 77 deletions(-) diff --git a/.env.test b/.env.test index d63c336..32159b1 100644 --- a/.env.test +++ b/.env.test @@ -1,10 +1,10 @@ # Test environment configuration for Docker PostgreSQL TEST_DB_HOST=localhost -TEST_DB_PORT=5432 +TEST_DB_PORT=5433 TEST_DB_NAME=que_test TEST_DB_USER=que_user TEST_DB_PASSWORD=que_password TEST_DB_SSL=false # Connection string format (alternative to individual fields) -TEST_DATABASE_URL=postgresql://que_user:que_password@localhost:5432/que_test \ No newline at end of file +TEST_DATABASE_URL=postgresql://que_user:que_password@localhost:5433/que_test diff --git a/dist/worker.d.ts b/dist/worker.d.ts index c6020c6..0c27d68 100644 --- a/dist/worker.d.ts +++ b/dist/worker.d.ts @@ -7,6 +7,7 @@ export declare class Worker { private running; private shutdownPromise; private timeoutId; + private timeoutResolve; constructor(clientConfig?: ClientConfig, options?: WorkerOptions); register(jobClass: string, workFunc: WorkFunction): void; work(): Promise; diff --git a/dist/worker.d.ts.map b/dist/worker.d.ts.map index 00e8061..8112c3e 100644 --- a/dist/worker.d.ts.map +++ b/dist/worker.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"worker.d.ts","sourceRoot":"","sources":["../src/worker.ts"],"names":[],"mappings":"AACA,OAAO,EAAO,YAAY,EAAW,aAAa,EAAE,YAAY,EAAE,MAAM,SAAS,CAAC;AAElF,qBAAa,MAAM;IACjB,OAAO,CAAC,MAAM,CAAS;IACvB,OAAO,CAAC,OAAO,CAAe;IAC9B,OAAO,CAAC,KAAK,CAAS;IACtB,OAAO,CAAC,QAAQ,CAAS;IACzB,OAAO,CAAC,OAAO,CAAkB;IACjC,OAAO,CAAC,eAAe,CAA8B;IACrD,OAAO,CAAC,SAAS,CAA+B;gBAEpC,YAAY,GAAE,YAAiB,EAAE,OAAO,GAAE,aAAkB;IAMxE,QAAQ,CAAC,QAAQ,EAAE,MAAM,EAAE,QAAQ,EAAE,YAAY,GAAG,IAAI;IAIlD,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC;IAUrB,OAAO,IAAI,OAAO,CAAC,OAAO,CAAC;IAW3B,QAAQ,IAAI,OAAO,CAAC,IAAI,CAAC;YAmBjB,QAAQ;YAsBR,UAAU;CAgBzB"} \ No newline at end of file +{"version":3,"file":"worker.d.ts","sourceRoot":"","sources":["../src/worker.ts"],"names":[],"mappings":"AACA,OAAO,EAAO,YAAY,EAAW,aAAa,EAAE,YAAY,EAAE,MAAM,SAAS,CAAC;AAElF,qBAAa,MAAM;IACjB,OAAO,CAAC,MAAM,CAAS;IACvB,OAAO,CAAC,OAAO,CAAe;IAC9B,OAAO,CAAC,KAAK,CAAS;IACtB,OAAO,CAAC,QAAQ,CAAS;IACzB,OAAO,CAAC,OAAO,CAAkB;IACjC,OAAO,CAAC,eAAe,CAA8B;IACrD,OAAO,CAAC,SAAS,CAA+B;IAChD,OAAO,CAAC,cAAc,CAA6B;gBAEvC,YAAY,GAAE,YAAiB,EAAE,OAAO,GAAE,aAAkB;IAMxE,QAAQ,CAAC,QAAQ,EAAE,MAAM,EAAE,QAAQ,EAAE,YAAY,GAAG,IAAI;IAIlD,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC;IAUrB,OAAO,IAAI,OAAO,CAAC,OAAO,CAAC;IAW3B,QAAQ,IAAI,OAAO,CAAC,IAAI,CAAC;YAyBjB,QAAQ;YA8BR,UAAU;CAgBzB"} \ No newline at end of file diff --git a/dist/worker.js b/dist/worker.js index 2d22c88..e7f97c5 100644 --- a/dist/worker.js +++ b/dist/worker.js @@ -8,9 +8,10 @@ class Worker { this.running = false; this.shutdownPromise = null; this.timeoutId = null; + this.timeoutResolve = null; this.client = new client_1.Client(clientConfig); this.queue = options.queue || ''; - this.interval = options.interval || 5000; + this.interval = options.interval || 60 * 1000; } register(jobClass, workFunc) { this.workMap[jobClass] = workFunc; @@ -40,6 +41,11 @@ class Worker { clearTimeout(this.timeoutId); this.timeoutId = null; } + // Resolve any pending timeout promises + if (this.timeoutResolve) { + this.timeoutResolve(); + this.timeoutResolve = null; + } if (this.shutdownPromise) { await this.shutdownPromise; } @@ -51,7 +57,11 @@ class Worker { const processed = await this.workOne(); if (!processed && this.running) { await new Promise((resolve) => { - this.timeoutId = setTimeout(resolve, this.interval); + this.timeoutResolve = resolve; + this.timeoutId = setTimeout(() => { + this.timeoutResolve = null; + resolve(); + }, this.interval); }); } } @@ -59,7 +69,11 @@ class Worker { console.error('Worker error:', error); if (this.running) { await new Promise((resolve) => { - this.timeoutId = setTimeout(resolve, this.interval); + this.timeoutResolve = resolve; + this.timeoutId = setTimeout(() => { + this.timeoutResolve = null; + resolve(); + }, this.interval); }); } } diff --git a/dist/worker.js.map b/dist/worker.js.map index b4738b4..f61c074 100644 --- a/dist/worker.js.map +++ b/dist/worker.js.map @@ -1 +1 @@ -{"version":3,"file":"worker.js","sourceRoot":"","sources":["../src/worker.ts"],"names":[],"mappings":";;;AAAA,qCAAkC;AAGlC,MAAa,MAAM;IASjB,YAAY,eAA6B,EAAE,EAAE,UAAyB,EAAE;QAPhE,YAAO,GAAY,EAAE,CAAC;QAGtB,YAAO,GAAY,KAAK,CAAC;QACzB,oBAAe,GAAyB,IAAI,CAAC;QAC7C,cAAS,GAA0B,IAAI,CAAC;QAG9C,IAAI,CAAC,MAAM,GAAG,IAAI,eAAM,CAAC,YAAY,CAAC,CAAC;QACvC,IAAI,CAAC,KAAK,GAAG,OAAO,CAAC,KAAK,IAAI,EAAE,CAAC;QACjC,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,QAAQ,IAAI,IAAI,CAAC;IAC3C,CAAC;IAED,QAAQ,CAAC,QAAgB,EAAE,QAAsB;QAC/C,IAAI,CAAC,OAAO,CAAC,QAAQ,CAAC,GAAG,QAAQ,CAAC;IACpC,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI,IAAI,CAAC,OAAO,EAAE,CAAC;YACjB,MAAM,IAAI,KAAK,CAAC,2BAA2B,CAAC,CAAC;QAC/C,CAAC;QAED,IAAI,CAAC,OAAO,GAAG,IAAI,CAAC;QACpB,IAAI,CAAC,eAAe,GAAG,IAAI,CAAC,QAAQ,EAAE,CAAC;QACvC,OAAO,IAAI,CAAC,eAAe,CAAC;IAC9B,CAAC;IAED,KAAK,CAAC,OAAO;QACX,MAAM,GAAG,GAAG,MAAM,IAAI,CAAC,MAAM,CAAC,OAAO,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;QAElD,IAAI,CAAC,GAAG,EAAE,CAAC;YACT,OAAO,KAAK,CAAC;QACf,CAAC;QAED,MAAM,IAAI,CAAC,UAAU,CAAC,GAAG,CAAC,CAAC;QAC3B,OAAO,IAAI,CAAC;IACd,CAAC;IAED,KAAK,CAAC,QAAQ;QACZ,IAAI,CAAC,IAAI,CAAC,OAAO,EAAE,CAAC;YAClB,OAAO;QACT,CAAC;QAED,IAAI,CAAC,OAAO,GAAG,KAAK,CAAC;QAErB,IAAI,IAAI,CAAC,SAAS,EAAE,CAAC;YACnB,YAAY,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;YAC7B,IAAI,CAAC,SAAS,GAAG,IAAI,CAAC;QACxB,CAAC;QAED,IAAI,IAAI,CAAC,eAAe,EAAE,CAAC;YACzB,MAAM,IAAI,CAAC,eAAe,CAAC;QAC7B,CAAC;QAED,MAAM,IAAI,CAAC,MAAM,CAAC,KAAK,EAAE,CAAC;IAC5B,CAAC;IAEO,KAAK,CAAC,QAAQ;QACpB,OAAO,IAAI,CAAC,OAAO,EAAE,CAAC;YACpB,IAAI,CAAC;gBACH,MAAM,SAAS,GAAG,MAAM,IAAI,CAAC,OAAO,EAAE,CAAC;gBAEvC,IAAI,CAAC,SAAS,IAAI,IAAI,CAAC,OAAO,EAAE,CAAC;oBAC/B,MAAM,IAAI,OAAO,CAAO,CAAC,OAAO,EAAE,EAAE;wBAClC,IAAI,CAAC,SAAS,GAAG,UAAU,CAAC,OAAO,EAAE,IAAI,CAAC,QAAQ,CAAC,CAAC;oBACtD,CAAC,CAAC,CAAC;gBACL,CAAC;YACH,CAAC;YAAC,OAAO,KAAK,EAAE,CAAC;gBACf,OAAO,CAAC,KAAK,CAAC,eAAe,EAAE,KAAK,CAAC,CAAC;gBAEtC,IAAI,IAAI,CAAC,OAAO,EAAE,CAAC;oBACjB,MAAM,IAAI,OAAO,CAAO,CAAC,OAAO,EAAE,EAAE;wBAClC,IAAI,CAAC,SAAS,GAAG,UAAU,CAAC,OAAO,EAAE,IAAI,CAAC,QAAQ,CAAC,CAAC;oBACtD,CAAC,CAAC,CAAC;gBACL,CAAC;YACH,CAAC;QACH,CAAC;IACH,CAAC;IAEO,KAAK,CAAC,UAAU,CAAC,GAAQ;QAC/B,MAAM,QAAQ,GAAG,IAAI,CAAC,OAAO,CAAC,GAAG,CAAC,QAAQ,CAAC,CAAC;QAE5C,IAAI,CAAC,QAAQ,EAAE,CAAC;YACd,MAAM,GAAG,CAAC,KAAK,CAAC,8CAA8C,GAAG,CAAC,QAAQ,EAAE,CAAC,CAAC;YAC9E,OAAO;QACT,CAAC;QAED,IAAI,CAAC;YACH,MAAM,QAAQ,CAAC,GAAG,CAAC,CAAC;YACpB,MAAM,GAAG,CAAC,IAAI,EAAE,CAAC;QACnB,CAAC;QAAC,OAAO,KAAK,EAAE,CAAC;YACf,MAAM,YAAY,GAAG,KAAK,YAAY,KAAK,CAAC,CAAC,CAAC,KAAK,CAAC,OAAO,CAAC,CAAC,CAAC,MAAM,CAAC,KAAK,CAAC,CAAC;YAC5E,MAAM,GAAG,CAAC,KAAK,CAAC,YAAY,CAAC,CAAC;QAChC,CAAC;IACH,CAAC;CACF;AAjGD,wBAiGC"} \ No newline at end of file +{"version":3,"file":"worker.js","sourceRoot":"","sources":["../src/worker.ts"],"names":[],"mappings":";;;AAAA,qCAAkC;AAGlC,MAAa,MAAM;IAUjB,YAAY,eAA6B,EAAE,EAAE,UAAyB,EAAE;QARhE,YAAO,GAAY,EAAE,CAAC;QAGtB,YAAO,GAAY,KAAK,CAAC;QACzB,oBAAe,GAAyB,IAAI,CAAC;QAC7C,cAAS,GAA0B,IAAI,CAAC;QACxC,mBAAc,GAAwB,IAAI,CAAC;QAGjD,IAAI,CAAC,MAAM,GAAG,IAAI,eAAM,CAAC,YAAY,CAAC,CAAC;QACvC,IAAI,CAAC,KAAK,GAAG,OAAO,CAAC,KAAK,IAAI,EAAE,CAAC;QACjC,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,QAAQ,IAAI,EAAE,GAAG,IAAI,CAAC;IAChD,CAAC;IAED,QAAQ,CAAC,QAAgB,EAAE,QAAsB;QAC/C,IAAI,CAAC,OAAO,CAAC,QAAQ,CAAC,GAAG,QAAQ,CAAC;IACpC,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI,IAAI,CAAC,OAAO,EAAE,CAAC;YACjB,MAAM,IAAI,KAAK,CAAC,2BAA2B,CAAC,CAAC;QAC/C,CAAC;QAED,IAAI,CAAC,OAAO,GAAG,IAAI,CAAC;QACpB,IAAI,CAAC,eAAe,GAAG,IAAI,CAAC,QAAQ,EAAE,CAAC;QACvC,OAAO,IAAI,CAAC,eAAe,CAAC;IAC9B,CAAC;IAED,KAAK,CAAC,OAAO;QACX,MAAM,GAAG,GAAG,MAAM,IAAI,CAAC,MAAM,CAAC,OAAO,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;QAElD,IAAI,CAAC,GAAG,EAAE,CAAC;YACT,OAAO,KAAK,CAAC;QACf,CAAC;QAED,MAAM,IAAI,CAAC,UAAU,CAAC,GAAG,CAAC,CAAC;QAC3B,OAAO,IAAI,CAAC;IACd,CAAC;IAED,KAAK,CAAC,QAAQ;QACZ,IAAI,CAAC,IAAI,CAAC,OAAO,EAAE,CAAC;YAClB,OAAO;QACT,CAAC;QAED,IAAI,CAAC,OAAO,GAAG,KAAK,CAAC;QAErB,IAAI,IAAI,CAAC,SAAS,EAAE,CAAC;YACnB,YAAY,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;YAC7B,IAAI,CAAC,SAAS,GAAG,IAAI,CAAC;QACxB,CAAC;QAED,uCAAuC;QACvC,IAAI,IAAI,CAAC,cAAc,EAAE,CAAC;YACxB,IAAI,CAAC,cAAc,EAAE,CAAC;YACtB,IAAI,CAAC,cAAc,GAAG,IAAI,CAAC;QAC7B,CAAC;QAED,IAAI,IAAI,CAAC,eAAe,EAAE,CAAC;YACzB,MAAM,IAAI,CAAC,eAAe,CAAC;QAC7B,CAAC;QAED,MAAM,IAAI,CAAC,MAAM,CAAC,KAAK,EAAE,CAAC;IAC5B,CAAC;IAEO,KAAK,CAAC,QAAQ;QACpB,OAAO,IAAI,CAAC,OAAO,EAAE,CAAC;YACpB,IAAI,CAAC;gBACH,MAAM,SAAS,GAAG,MAAM,IAAI,CAAC,OAAO,EAAE,CAAC;gBAEvC,IAAI,CAAC,SAAS,IAAI,IAAI,CAAC,OAAO,EAAE,CAAC;oBAC/B,MAAM,IAAI,OAAO,CAAO,CAAC,OAAO,EAAE,EAAE;wBAClC,IAAI,CAAC,cAAc,GAAG,OAAO,CAAC;wBAC9B,IAAI,CAAC,SAAS,GAAG,UAAU,CAAC,GAAG,EAAE;4BAC/B,IAAI,CAAC,cAAc,GAAG,IAAI,CAAC;4BAC3B,OAAO,EAAE,CAAC;wBACZ,CAAC,EAAE,IAAI,CAAC,QAAQ,CAAC,CAAC;oBACpB,CAAC,CAAC,CAAC;gBACL,CAAC;YACH,CAAC;YAAC,OAAO,KAAK,EAAE,CAAC;gBACf,OAAO,CAAC,KAAK,CAAC,eAAe,EAAE,KAAK,CAAC,CAAC;gBAEtC,IAAI,IAAI,CAAC,OAAO,EAAE,CAAC;oBACjB,MAAM,IAAI,OAAO,CAAO,CAAC,OAAO,EAAE,EAAE;wBAClC,IAAI,CAAC,cAAc,GAAG,OAAO,CAAC;wBAC9B,IAAI,CAAC,SAAS,GAAG,UAAU,CAAC,GAAG,EAAE;4BAC/B,IAAI,CAAC,cAAc,GAAG,IAAI,CAAC;4BAC3B,OAAO,EAAE,CAAC;wBACZ,CAAC,EAAE,IAAI,CAAC,QAAQ,CAAC,CAAC;oBACpB,CAAC,CAAC,CAAC;gBACL,CAAC;YACH,CAAC;QACH,CAAC;IACH,CAAC;IAEO,KAAK,CAAC,UAAU,CAAC,GAAQ;QAC/B,MAAM,QAAQ,GAAG,IAAI,CAAC,OAAO,CAAC,GAAG,CAAC,QAAQ,CAAC,CAAC;QAE5C,IAAI,CAAC,QAAQ,EAAE,CAAC;YACd,MAAM,GAAG,CAAC,KAAK,CAAC,8CAA8C,GAAG,CAAC,QAAQ,EAAE,CAAC,CAAC;YAC9E,OAAO;QACT,CAAC;QAED,IAAI,CAAC;YACH,MAAM,QAAQ,CAAC,GAAG,CAAC,CAAC;YACpB,MAAM,GAAG,CAAC,IAAI,EAAE,CAAC;QACnB,CAAC;QAAC,OAAO,KAAK,EAAE,CAAC;YACf,MAAM,YAAY,GAAG,KAAK,YAAY,KAAK,CAAC,CAAC,CAAC,KAAK,CAAC,OAAO,CAAC,CAAC,CAAC,MAAM,CAAC,KAAK,CAAC,CAAC;YAC5E,MAAM,GAAG,CAAC,KAAK,CAAC,YAAY,CAAC,CAAC;QAChC,CAAC;IACH,CAAC;CACF;AAhHD,wBAgHC"} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index e4ef78c..4d2214e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: '3.8' +version: "3.8" services: postgres: @@ -9,7 +9,7 @@ services: POSTGRES_USER: que_user POSTGRES_PASSWORD: que_password ports: - - "5432:5432" + - "5433:5432" volumes: - postgres_data:/var/lib/postgresql/data - ./migrations/schema.sql:/docker-entrypoint-initdb.d/01-schema.sql diff --git a/package-lock.json b/package-lock.json index da58161..1911540 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7,9 +7,11 @@ "": { "name": "que-ts", "version": "1.0.0", + "hasInstallScript": true, "license": "MIT", "dependencies": { - "pg": "^8.11.3" + "pg": "^8.11.3", + "typescript": "^5.2.2" }, "devDependencies": { "@types/jest": "^29.5.5", @@ -20,8 +22,7 @@ "dotenv": "^17.2.1", "eslint": "^8.49.0", "jest": "^29.7.0", - "ts-jest": "^29.1.1", - "typescript": "^5.2.2" + "ts-jest": "^29.1.1" }, "engines": { "node": ">=16.0.0" @@ -4727,7 +4728,6 @@ "version": "5.9.2", "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.2.tgz", "integrity": "sha512-CWBzXQrc/qOkhidw1OzBTQuYRbfyxDXJMVJ1XNwUHGROVmuaeiEm3OslpZ1RV96d7SKKjZKrSJu3+t/xlw3R9A==", - "dev": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" diff --git a/src/client.ts b/src/client.ts index 0573775..2eaa40f 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,11 +1,12 @@ -import { Pool, PoolClient } from 'pg'; -import { Job, EnqueueOptions, ClientConfig, JobRow, JSONArray } from './types'; -import { JobInstance } from './job'; -import { SQL_QUERIES } from './sql'; -import { formatJobArgs } from './utils'; +import { Pool, PoolClient } from "pg"; +import { Job, EnqueueOptions, ClientConfig, JobRow, JSONArray } from "./types"; +import { JobInstance } from "./job"; +import { SQL_QUERIES } from "./sql"; +import { formatJobArgs, runMigrations } from "./utils"; export class Client { private pool: Pool; + private migrationPromise: Promise; constructor(config: ClientConfig = {}) { this.pool = new Pool({ @@ -20,6 +21,18 @@ export class Client { idleTimeoutMillis: 5000, // Close idle connections after 5 seconds connectionTimeoutMillis: 5000, // Timeout connection attempts after 5 seconds }); + // run migrations on startup and store the promise + this.migrationPromise = runMigrations(this.pool).catch((error) => { + // Only log if it's not a "table already exists" error + if (!error.message?.includes('already exists')) { + console.error("Error running migrations", error); + } + }); + } + + // Helper to ensure migrations are complete before operations + private async ensureMigrations(): Promise { + await this.migrationPromise; } async enqueue( @@ -27,20 +40,17 @@ export class Client { args: JSONArray = [], options: EnqueueOptions = {} ): Promise { - const { - priority = 100, - runAt = new Date(), - queue = '' - } = options; + await this.ensureMigrations(); + const { priority = 100, runAt = new Date(), queue = "" } = options; const argsJson = formatJobArgs(args); - + const result = await this.pool.query(SQL_QUERIES.ENQUEUE_JOB, [ jobClass, argsJson, priority, runAt, - queue + queue, ]); const row = result.rows[0] as JobRow; @@ -53,29 +63,26 @@ export class Client { args: JSONArray = [], options: EnqueueOptions = {} ): Promise { - const { - priority = 100, - runAt = new Date(), - queue = '' - } = options; + const { priority = 100, runAt = new Date(), queue = "" } = options; const argsJson = formatJobArgs(args); - + const result = await client.query(SQL_QUERIES.ENQUEUE_JOB, [ jobClass, argsJson, priority, runAt, - queue + queue, ]); const row = result.rows[0] as JobRow; return new JobInstance(row, this.pool); } - async lockJob(queue: string = ''): Promise { + async lockJob(queue: string = ""): Promise { + await this.ensureMigrations(); const result = await this.pool.query(SQL_QUERIES.LOCK_JOB, [queue]); - + if (result.rows.length === 0) { return null; } @@ -87,6 +94,6 @@ export class Client { async close(): Promise { await this.pool.end(); // Small delay to ensure all connections are fully closed - await new Promise(resolve => setTimeout(resolve, 50)); + await new Promise((resolve) => setTimeout(resolve, 50)); } -} \ No newline at end of file +} diff --git a/src/utils.ts b/src/utils.ts index 5ad9c6a..59ce027 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,4 +1,7 @@ -import { JSONArray, JSONValue } from './types'; +import fs from "fs"; +import path from "path"; +import { JSONArray } from "./types"; +import { Pool } from "pg"; export function intPow(base: number, exponent: number): number { if (exponent < 0) { @@ -25,8 +28,39 @@ export function parseJobArgs(args: JSONArray): JSONArray { // PostgreSQL JSON column is already parsed by the pg driver // Just validate it's an array and return it if (!Array.isArray(args)) { - throw new Error(`Expected job arguments to be an array, received: ${typeof args}`); + throw new Error( + `Expected job arguments to be an array, received: ${typeof args}` + ); } return args; } + +export async function runMigrations(client: Pool): Promise { + // test if the client is connected + try { + await client.query("SELECT 1"); + } catch (error) { + throw new Error("Client is not connected"); + } + + // Check if the que_jobs table exists + const tableExistsQuery = ` + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = 'que_jobs' + ); + `; + + const tableExistsResult = await client.query(tableExistsQuery); + const tableExists = tableExistsResult.rows[0].exists; + + if (!tableExists) { + const migrations = fs.readFileSync( + path.join(__dirname, "../migrations/schema.sql"), + "utf8" + ); + await client.query(migrations); + } +} diff --git a/src/worker.ts b/src/worker.ts index 786efef..ad3b190 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -79,6 +79,8 @@ export class Worker { this.timeoutResolve = null; resolve(); }, this.interval); + // Allow process to exit even if timer is active + this.timeoutId.unref(); }); } } catch (error) { @@ -91,6 +93,8 @@ export class Worker { this.timeoutResolve = null; resolve(); }, this.interval); + // Allow process to exit even if timer is active + this.timeoutId.unref(); }); } } diff --git a/tests/impl/test_client.test.ts b/tests/impl/test_client.test.ts index e469da2..195dd9d 100644 --- a/tests/impl/test_client.test.ts +++ b/tests/impl/test_client.test.ts @@ -16,7 +16,7 @@ describe('que', () => { await que.registerWorker(new TestWorker()); que.start(); // Give worker a moment to start - await new Promise(resolve => setTimeout(resolve, 100)); + await new Promise(resolve => setTimeout(resolve, 500)); }); afterAll(async () => { @@ -29,12 +29,17 @@ describe('que', () => { expect(job.jobClass).toBe('TestWorker'); expect(job.id).toBeGreaterThan(0); - expect(workerResult).toHaveLength(0) + expect(workerResult).toHaveLength(0); - // Wait for job to be processed - await new Promise((resolve) => setTimeout(resolve, 2000)); + // Wait for job to be processed - worker interval is 1000ms + // Wait up to 3 seconds for the job to be processed + let attempts = 0; + while (workerResult.length === 0 && attempts < 30) { + await new Promise((resolve) => setTimeout(resolve, 100)); + attempts++; + } - expect(workerResult).toHaveLength(1) - expect(workerResult[0]).toBe('test') + expect(workerResult).toHaveLength(1); + expect(workerResult[0]).toBe('test'); }); }); diff --git a/tests/impl/test_client.ts b/tests/impl/test_client.ts index 72a6eaa..8ceaadb 100644 --- a/tests/impl/test_client.ts +++ b/tests/impl/test_client.ts @@ -12,7 +12,7 @@ class Que { constructor() { this.client = new Client(TEST_DB_CONFIG); - this.worker = new Worker(TEST_DB_CONFIG, { interval: 1000 }); + this.worker = new Worker(TEST_DB_CONFIG, { interval: 100 }); // Shorter interval for faster testing process.on('SIGINT', async () => { console.log('Shutting down worker...'); diff --git a/tests/setup.ts b/tests/setup.ts index 4aba551..fc883a5 100644 --- a/tests/setup.ts +++ b/tests/setup.ts @@ -1,20 +1,21 @@ -import { Pool } from 'pg'; +import { Pool } from "pg"; export const TEST_DB_CONFIG = { - host: process.env.TEST_DB_HOST || 'localhost', - port: parseInt(process.env.TEST_DB_PORT || '5432'), - database: process.env.TEST_DB_NAME || 'que_test', - user: process.env.TEST_DB_USER || 'que_user', - password: process.env.TEST_DB_PASSWORD || 'que_password', - ssl: process.env.TEST_DB_SSL === 'true' ? true : false, + host: process.env.TEST_DB_HOST || "localhost", + port: parseInt(process.env.TEST_DB_PORT || "5433"), + database: process.env.TEST_DB_NAME || "que_test", + user: process.env.TEST_DB_USER || "que_user", + password: process.env.TEST_DB_PASSWORD || "que_password", + ssl: process.env.TEST_DB_SSL === "true" ? true : false, }; export async function setupTestDatabase(): Promise { + console.log("TEST_DB_CONFIG", TEST_DB_CONFIG); const pool = new Pool(TEST_DB_CONFIG); - + // Test connection try { - await pool.query('SELECT 1'); + await pool.query("SELECT 1"); } catch (error) { throw new Error(`Failed to connect to test database: ${error}`); } @@ -39,6 +40,6 @@ export async function setupTestDatabase(): Promise { } export async function cleanupTestDatabase(pool: Pool): Promise { - await pool.query('TRUNCATE que_jobs'); + await pool.query("TRUNCATE que_jobs"); await pool.end(); -} \ No newline at end of file +} diff --git a/tests/utils.test.ts b/tests/utils.test.ts index 89976cf..e9d9fb5 100644 --- a/tests/utils.test.ts +++ b/tests/utils.test.ts @@ -1,21 +1,27 @@ -import { intPow, calculateRetryDelay, formatJobArgs, parseJobArgs } from '../src/utils'; +import { + intPow, + calculateRetryDelay, + formatJobArgs, + parseJobArgs, + runMigrations, +} from "../src/utils"; -describe('utils', () => { - describe('intPow', () => { - it('should calculate integer power correctly', () => { +describe("utils", () => { + describe("intPow", () => { + it("should calculate integer power correctly", () => { expect(intPow(2, 3)).toBe(8); expect(intPow(5, 2)).toBe(25); expect(intPow(10, 0)).toBe(1); expect(intPow(3, 1)).toBe(3); }); - it('should handle negative exponents', () => { + it("should handle negative exponents", () => { expect(intPow(2, -1)).toBe(0); }); }); - describe('calculateRetryDelay', () => { - it('should calculate correct retry delays', () => { + describe("calculateRetryDelay", () => { + it("should calculate correct retry delays", () => { expect(calculateRetryDelay(0)).toBe(0); expect(calculateRetryDelay(1)).toBe(1); expect(calculateRetryDelay(2)).toBe(16); @@ -23,26 +29,70 @@ describe('utils', () => { }); }); - describe('formatJobArgs', () => { - it('should format args as JSON string', () => { - expect(formatJobArgs(['arg1', 'arg2'])).toBe('["arg1","arg2"]'); - expect(formatJobArgs([{ key: 'value' }])).toBe('[{"key":"value"}]'); - expect(formatJobArgs([])).toBe('[]'); + describe("formatJobArgs", () => { + it("should format args as JSON string", () => { + expect(formatJobArgs(["arg1", "arg2"])).toBe('["arg1","arg2"]'); + expect(formatJobArgs([{ key: "value" }])).toBe('[{"key":"value"}]'); + expect(formatJobArgs([])).toBe("[]"); }); }); - describe('parseJobArgs', () => { - it('should return array when given valid JSON array', () => { - expect(parseJobArgs(['arg1', 'arg2'])).toEqual(['arg1', 'arg2']); - expect(parseJobArgs([{ key: 'value' }])).toEqual([{ key: 'value' }]); + describe("parseJobArgs", () => { + it("should return array when given valid JSON array", () => { + expect(parseJobArgs(["arg1", "arg2"])).toEqual(["arg1", "arg2"]); + expect(parseJobArgs([{ key: "value" }])).toEqual([{ key: "value" }]); expect(parseJobArgs([1, 2, true, null])).toEqual([1, 2, true, null]); expect(parseJobArgs([])).toEqual([]); }); - it('should throw error for non-array input', () => { - expect(() => parseJobArgs('invalid string' as any)).toThrow('Expected job arguments to be an array'); - expect(() => parseJobArgs({ key: 'value' } as any)).toThrow('Expected job arguments to be an array'); - expect(() => parseJobArgs(123 as any)).toThrow('Expected job arguments to be an array'); + it("should throw error for non-array input", () => { + expect(() => parseJobArgs("invalid string" as any)).toThrow( + "Expected job arguments to be an array" + ); + expect(() => parseJobArgs({ key: "value" } as any)).toThrow( + "Expected job arguments to be an array" + ); + expect(() => parseJobArgs(123 as any)).toThrow( + "Expected job arguments to be an array" + ); }); }); -}); \ No newline at end of file + + describe("runMigrations", () => { + it("should throw error if client is not connected", async () => { + const client = { + query: jest.fn().mockRejectedValue(new Error("Connection failed")), + } as any; + + await expect(runMigrations(client)).rejects.toThrow("Client is not connected"); + }); + + it("should run migrations if que_jobs table does not exist", async () => { + const mockQuery = jest.fn(); + // First call: SELECT 1 (connection test) + mockQuery.mockResolvedValueOnce({ rows: [{ "?column?": 1 }] }); + // Second call: Check table existence + mockQuery.mockResolvedValueOnce({ rows: [{ exists: false }] }); + // Third call: Run migrations + mockQuery.mockResolvedValueOnce({ command: "CREATE" }); + + const client = { query: mockQuery } as any; + + await expect(runMigrations(client)).resolves.toBeUndefined(); + expect(mockQuery).toHaveBeenCalledTimes(3); + }); + + it("should not run migrations if que_jobs table exists", async () => { + const mockQuery = jest.fn(); + // First call: SELECT 1 (connection test) + mockQuery.mockResolvedValueOnce({ rows: [{ "?column?": 1 }] }); + // Second call: Check table existence + mockQuery.mockResolvedValueOnce({ rows: [{ exists: true }] }); + + const client = { query: mockQuery } as any; + + await expect(runMigrations(client)).resolves.toBeUndefined(); + expect(mockQuery).toHaveBeenCalledTimes(2); // Should not run migrations + }); + }); +}); diff --git a/tests/worker.test.ts b/tests/worker.test.ts index b731392..08fb24a 100644 --- a/tests/worker.test.ts +++ b/tests/worker.test.ts @@ -16,6 +16,8 @@ describe('Worker', () => { client = new Client(TEST_DB_CONFIG); worker = new Worker(TEST_DB_CONFIG); await pool.query('TRUNCATE que_jobs'); + // Small delay to ensure TRUNCATE completes + await new Promise(resolve => setTimeout(resolve, 50)); }); afterEach(async () => { @@ -43,6 +45,10 @@ describe('Worker', () => { describe('workOne', () => { it('should return false when no jobs available', async () => { + // Ensure table is truly empty before test + const checkEmpty = await pool.query('SELECT * FROM que_jobs'); + expect(checkEmpty.rows.length).toBe(0); + const result = await worker.workOne(); expect(result).toBe(false); }); From afc89e5c7b528ef9a8992956e0ee525cbd15ea91 Mon Sep 17 00:00:00 2001 From: Quajo Duke Date: Tue, 2 Sep 2025 09:22:06 +0000 Subject: [PATCH 2/5] wip: concurrency --- src/types.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/types.ts b/src/types.ts index 1ce3f47..c50ecb5 100644 --- a/src/types.ts +++ b/src/types.ts @@ -11,7 +11,7 @@ export interface JSONObject { [key: string]: JSONValue; } -export interface JSONArray extends Array { } +export interface JSONArray extends Array {} export interface Job { id: number; @@ -58,6 +58,7 @@ export interface WorkerOptions { queue?: string; interval?: number; maxAttempts?: number; + concurrency?: number; } export interface JobRow { From d8253e9c28e4db9a6dc5bd18deb53864822d1650 Mon Sep 17 00:00:00 2001 From: Quajo Duke Date: Tue, 2 Sep 2025 09:23:42 +0000 Subject: [PATCH 3/5] chore: build --- dist/client.d.ts | 6 ++++-- dist/client.d.ts.map | 2 +- dist/client.js | 25 +++++++++++++++++++------ dist/client.js.map | 2 +- dist/types.d.ts | 1 + dist/types.d.ts.map | 2 +- dist/utils.d.ts | 4 +++- dist/utils.d.ts.map | 2 +- dist/utils.js | 29 +++++++++++++++++++++++++++++ dist/utils.js.map | 2 +- dist/worker.d.ts.map | 2 +- dist/worker.js | 4 ++++ dist/worker.js.map | 2 +- 13 files changed, 67 insertions(+), 16 deletions(-) diff --git a/dist/client.d.ts b/dist/client.d.ts index d96a94e..a38463c 100644 --- a/dist/client.d.ts +++ b/dist/client.d.ts @@ -1,8 +1,10 @@ -import { PoolClient } from 'pg'; -import { Job, EnqueueOptions, ClientConfig, JSONArray } from './types'; +import { PoolClient } from "pg"; +import { Job, EnqueueOptions, ClientConfig, JSONArray } from "./types"; export declare class Client { private pool; + private migrationPromise; constructor(config?: ClientConfig); + private ensureMigrations; enqueue(jobClass: string, args?: JSONArray, options?: EnqueueOptions): Promise; enqueueInTx(client: PoolClient, jobClass: string, args?: JSONArray, options?: EnqueueOptions): Promise; lockJob(queue?: string): Promise; diff --git a/dist/client.d.ts.map b/dist/client.d.ts.map index 09c03bd..9b37340 100644 --- a/dist/client.d.ts.map +++ b/dist/client.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"client.d.ts","sourceRoot":"","sources":["../src/client.ts"],"names":[],"mappings":"AAAA,OAAO,EAAQ,UAAU,EAAE,MAAM,IAAI,CAAC;AACtC,OAAO,EAAE,GAAG,EAAE,cAAc,EAAE,YAAY,EAAU,SAAS,EAAE,MAAM,SAAS,CAAC;AAK/E,qBAAa,MAAM;IACjB,OAAO,CAAC,IAAI,CAAO;gBAEP,MAAM,GAAE,YAAiB;IAe/B,OAAO,CACX,QAAQ,EAAE,MAAM,EAChB,IAAI,GAAE,SAAc,EACpB,OAAO,GAAE,cAAmB,GAC3B,OAAO,CAAC,GAAG,CAAC;IAqBT,WAAW,CACf,MAAM,EAAE,UAAU,EAClB,QAAQ,EAAE,MAAM,EAChB,IAAI,GAAE,SAAc,EACpB,OAAO,GAAE,cAAmB,GAC3B,OAAO,CAAC,GAAG,CAAC;IAqBT,OAAO,CAAC,KAAK,GAAE,MAAW,GAAG,OAAO,CAAC,GAAG,GAAG,IAAI,CAAC;IAWhD,KAAK,IAAI,OAAO,CAAC,IAAI,CAAC;CAK7B"} \ No newline at end of file +{"version":3,"file":"client.d.ts","sourceRoot":"","sources":["../src/client.ts"],"names":[],"mappings":"AAAA,OAAO,EAAQ,UAAU,EAAE,MAAM,IAAI,CAAC;AACtC,OAAO,EAAE,GAAG,EAAE,cAAc,EAAE,YAAY,EAAU,SAAS,EAAE,MAAM,SAAS,CAAC;AAK/E,qBAAa,MAAM;IACjB,OAAO,CAAC,IAAI,CAAO;IACnB,OAAO,CAAC,gBAAgB,CAAgB;gBAE5B,MAAM,GAAE,YAAiB;YAuBvB,gBAAgB;IAIxB,OAAO,CACX,QAAQ,EAAE,MAAM,EAChB,IAAI,GAAE,SAAc,EACpB,OAAO,GAAE,cAAmB,GAC3B,OAAO,CAAC,GAAG,CAAC;IAkBT,WAAW,CACf,MAAM,EAAE,UAAU,EAClB,QAAQ,EAAE,MAAM,EAChB,IAAI,GAAE,SAAc,EACpB,OAAO,GAAE,cAAmB,GAC3B,OAAO,CAAC,GAAG,CAAC;IAiBT,OAAO,CAAC,KAAK,GAAE,MAAW,GAAG,OAAO,CAAC,GAAG,GAAG,IAAI,CAAC;IAYhD,KAAK,IAAI,OAAO,CAAC,IAAI,CAAC;CAK7B"} \ No newline at end of file diff --git a/dist/client.js b/dist/client.js index 790d7bb..b85da89 100644 --- a/dist/client.js +++ b/dist/client.js @@ -19,34 +19,47 @@ class Client { idleTimeoutMillis: 5000, // Close idle connections after 5 seconds connectionTimeoutMillis: 5000, // Timeout connection attempts after 5 seconds }); + // run migrations on startup and store the promise + this.migrationPromise = (0, utils_1.runMigrations)(this.pool).catch((error) => { + // Only log if it's not a "table already exists" error + if (!error.message?.includes('already exists')) { + console.error("Error running migrations", error); + } + }); + } + // Helper to ensure migrations are complete before operations + async ensureMigrations() { + await this.migrationPromise; } async enqueue(jobClass, args = [], options = {}) { - const { priority = 100, runAt = new Date(), queue = '' } = options; + await this.ensureMigrations(); + const { priority = 100, runAt = new Date(), queue = "" } = options; const argsJson = (0, utils_1.formatJobArgs)(args); const result = await this.pool.query(sql_1.SQL_QUERIES.ENQUEUE_JOB, [ jobClass, argsJson, priority, runAt, - queue + queue, ]); const row = result.rows[0]; return new job_1.JobInstance(row, this.pool); } async enqueueInTx(client, jobClass, args = [], options = {}) { - const { priority = 100, runAt = new Date(), queue = '' } = options; + const { priority = 100, runAt = new Date(), queue = "" } = options; const argsJson = (0, utils_1.formatJobArgs)(args); const result = await client.query(sql_1.SQL_QUERIES.ENQUEUE_JOB, [ jobClass, argsJson, priority, runAt, - queue + queue, ]); const row = result.rows[0]; return new job_1.JobInstance(row, this.pool); } - async lockJob(queue = '') { + async lockJob(queue = "") { + await this.ensureMigrations(); const result = await this.pool.query(sql_1.SQL_QUERIES.LOCK_JOB, [queue]); if (result.rows.length === 0) { return null; @@ -57,7 +70,7 @@ class Client { async close() { await this.pool.end(); // Small delay to ensure all connections are fully closed - await new Promise(resolve => setTimeout(resolve, 50)); + await new Promise((resolve) => setTimeout(resolve, 50)); } } exports.Client = Client; diff --git a/dist/client.js.map b/dist/client.js.map index f07f065..cd5377b 100644 --- a/dist/client.js.map +++ b/dist/client.js.map @@ -1 +1 @@ -{"version":3,"file":"client.js","sourceRoot":"","sources":["../src/client.ts"],"names":[],"mappings":";;;AAAA,2BAAsC;AAEtC,+BAAoC;AACpC,+BAAoC;AACpC,mCAAwC;AAExC,MAAa,MAAM;IAGjB,YAAY,SAAuB,EAAE;QACnC,IAAI,CAAC,IAAI,GAAG,IAAI,SAAI,CAAC;YACnB,gBAAgB,EAAE,MAAM,CAAC,gBAAgB;YACzC,IAAI,EAAE,MAAM,CAAC,IAAI;YACjB,IAAI,EAAE,MAAM,CAAC,IAAI;YACjB,QAAQ,EAAE,MAAM,CAAC,QAAQ;YACzB,IAAI,EAAE,MAAM,CAAC,IAAI;YACjB,QAAQ,EAAE,MAAM,CAAC,QAAQ;YACzB,GAAG,EAAE,MAAM,CAAC,GAAG;YACf,GAAG,EAAE,MAAM,CAAC,cAAc,IAAI,EAAE;YAChC,iBAAiB,EAAE,IAAI,EAAE,yCAAyC;YAClE,uBAAuB,EAAE,IAAI,EAAE,8CAA8C;SAC9E,CAAC,CAAC;IACL,CAAC;IAED,KAAK,CAAC,OAAO,CACX,QAAgB,EAChB,OAAkB,EAAE,EACpB,UAA0B,EAAE;QAE5B,MAAM,EACJ,QAAQ,GAAG,GAAG,EACd,KAAK,GAAG,IAAI,IAAI,EAAE,EAClB,KAAK,GAAG,EAAE,EACX,GAAG,OAAO,CAAC;QAEZ,MAAM,QAAQ,GAAG,IAAA,qBAAa,EAAC,IAAI,CAAC,CAAC;QAErC,MAAM,MAAM,GAAG,MAAM,IAAI,CAAC,IAAI,CAAC,KAAK,CAAC,iBAAW,CAAC,WAAW,EAAE;YAC5D,QAAQ;YACR,QAAQ;YACR,QAAQ;YACR,KAAK;YACL,KAAK;SACN,CAAC,CAAC;QAEH,MAAM,GAAG,GAAG,MAAM,CAAC,IAAI,CAAC,CAAC,CAAW,CAAC;QACrC,OAAO,IAAI,iBAAW,CAAC,GAAG,EAAE,IAAI,CAAC,IAAI,CAAC,CAAC;IACzC,CAAC;IAED,KAAK,CAAC,WAAW,CACf,MAAkB,EAClB,QAAgB,EAChB,OAAkB,EAAE,EACpB,UAA0B,EAAE;QAE5B,MAAM,EACJ,QAAQ,GAAG,GAAG,EACd,KAAK,GAAG,IAAI,IAAI,EAAE,EAClB,KAAK,GAAG,EAAE,EACX,GAAG,OAAO,CAAC;QAEZ,MAAM,QAAQ,GAAG,IAAA,qBAAa,EAAC,IAAI,CAAC,CAAC;QAErC,MAAM,MAAM,GAAG,MAAM,MAAM,CAAC,KAAK,CAAC,iBAAW,CAAC,WAAW,EAAE;YACzD,QAAQ;YACR,QAAQ;YACR,QAAQ;YACR,KAAK;YACL,KAAK;SACN,CAAC,CAAC;QAEH,MAAM,GAAG,GAAG,MAAM,CAAC,IAAI,CAAC,CAAC,CAAW,CAAC;QACrC,OAAO,IAAI,iBAAW,CAAC,GAAG,EAAE,IAAI,CAAC,IAAI,CAAC,CAAC;IACzC,CAAC;IAED,KAAK,CAAC,OAAO,CAAC,QAAgB,EAAE;QAC9B,MAAM,MAAM,GAAG,MAAM,IAAI,CAAC,IAAI,CAAC,KAAK,CAAC,iBAAW,CAAC,QAAQ,EAAE,CAAC,KAAK,CAAC,CAAC,CAAC;QAEpE,IAAI,MAAM,CAAC,IAAI,CAAC,MAAM,KAAK,CAAC,EAAE,CAAC;YAC7B,OAAO,IAAI,CAAC;QACd,CAAC;QAED,MAAM,GAAG,GAAG,MAAM,CAAC,IAAI,CAAC,CAAC,CAAW,CAAC;QACrC,OAAO,IAAI,iBAAW,CAAC,GAAG,EAAE,IAAI,CAAC,IAAI,CAAC,CAAC;IACzC,CAAC;IAED,KAAK,CAAC,KAAK;QACT,MAAM,IAAI,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC;QACtB,yDAAyD;QACzD,MAAM,IAAI,OAAO,CAAC,OAAO,CAAC,EAAE,CAAC,UAAU,CAAC,OAAO,EAAE,EAAE,CAAC,CAAC,CAAC;IACxD,CAAC;CACF;AArFD,wBAqFC"} \ No newline at end of file +{"version":3,"file":"client.js","sourceRoot":"","sources":["../src/client.ts"],"names":[],"mappings":";;;AAAA,2BAAsC;AAEtC,+BAAoC;AACpC,+BAAoC;AACpC,mCAAuD;AAEvD,MAAa,MAAM;IAIjB,YAAY,SAAuB,EAAE;QACnC,IAAI,CAAC,IAAI,GAAG,IAAI,SAAI,CAAC;YACnB,gBAAgB,EAAE,MAAM,CAAC,gBAAgB;YACzC,IAAI,EAAE,MAAM,CAAC,IAAI;YACjB,IAAI,EAAE,MAAM,CAAC,IAAI;YACjB,QAAQ,EAAE,MAAM,CAAC,QAAQ;YACzB,IAAI,EAAE,MAAM,CAAC,IAAI;YACjB,QAAQ,EAAE,MAAM,CAAC,QAAQ;YACzB,GAAG,EAAE,MAAM,CAAC,GAAG;YACf,GAAG,EAAE,MAAM,CAAC,cAAc,IAAI,EAAE;YAChC,iBAAiB,EAAE,IAAI,EAAE,yCAAyC;YAClE,uBAAuB,EAAE,IAAI,EAAE,8CAA8C;SAC9E,CAAC,CAAC;QACH,kDAAkD;QAClD,IAAI,CAAC,gBAAgB,GAAG,IAAA,qBAAa,EAAC,IAAI,CAAC,IAAI,CAAC,CAAC,KAAK,CAAC,CAAC,KAAK,EAAE,EAAE;YAC/D,sDAAsD;YACtD,IAAI,CAAC,KAAK,CAAC,OAAO,EAAE,QAAQ,CAAC,gBAAgB,CAAC,EAAE,CAAC;gBAC/C,OAAO,CAAC,KAAK,CAAC,0BAA0B,EAAE,KAAK,CAAC,CAAC;YACnD,CAAC;QACH,CAAC,CAAC,CAAC;IACL,CAAC;IAED,6DAA6D;IACrD,KAAK,CAAC,gBAAgB;QAC5B,MAAM,IAAI,CAAC,gBAAgB,CAAC;IAC9B,CAAC;IAED,KAAK,CAAC,OAAO,CACX,QAAgB,EAChB,OAAkB,EAAE,EACpB,UAA0B,EAAE;QAE5B,MAAM,IAAI,CAAC,gBAAgB,EAAE,CAAC;QAC9B,MAAM,EAAE,QAAQ,GAAG,GAAG,EAAE,KAAK,GAAG,IAAI,IAAI,EAAE,EAAE,KAAK,GAAG,EAAE,EAAE,GAAG,OAAO,CAAC;QAEnE,MAAM,QAAQ,GAAG,IAAA,qBAAa,EAAC,IAAI,CAAC,CAAC;QAErC,MAAM,MAAM,GAAG,MAAM,IAAI,CAAC,IAAI,CAAC,KAAK,CAAC,iBAAW,CAAC,WAAW,EAAE;YAC5D,QAAQ;YACR,QAAQ;YACR,QAAQ;YACR,KAAK;YACL,KAAK;SACN,CAAC,CAAC;QAEH,MAAM,GAAG,GAAG,MAAM,CAAC,IAAI,CAAC,CAAC,CAAW,CAAC;QACrC,OAAO,IAAI,iBAAW,CAAC,GAAG,EAAE,IAAI,CAAC,IAAI,CAAC,CAAC;IACzC,CAAC;IAED,KAAK,CAAC,WAAW,CACf,MAAkB,EAClB,QAAgB,EAChB,OAAkB,EAAE,EACpB,UAA0B,EAAE;QAE5B,MAAM,EAAE,QAAQ,GAAG,GAAG,EAAE,KAAK,GAAG,IAAI,IAAI,EAAE,EAAE,KAAK,GAAG,EAAE,EAAE,GAAG,OAAO,CAAC;QAEnE,MAAM,QAAQ,GAAG,IAAA,qBAAa,EAAC,IAAI,CAAC,CAAC;QAErC,MAAM,MAAM,GAAG,MAAM,MAAM,CAAC,KAAK,CAAC,iBAAW,CAAC,WAAW,EAAE;YACzD,QAAQ;YACR,QAAQ;YACR,QAAQ;YACR,KAAK;YACL,KAAK;SACN,CAAC,CAAC;QAEH,MAAM,GAAG,GAAG,MAAM,CAAC,IAAI,CAAC,CAAC,CAAW,CAAC;QACrC,OAAO,IAAI,iBAAW,CAAC,GAAG,EAAE,IAAI,CAAC,IAAI,CAAC,CAAC;IACzC,CAAC;IAED,KAAK,CAAC,OAAO,CAAC,QAAgB,EAAE;QAC9B,MAAM,IAAI,CAAC,gBAAgB,EAAE,CAAC;QAC9B,MAAM,MAAM,GAAG,MAAM,IAAI,CAAC,IAAI,CAAC,KAAK,CAAC,iBAAW,CAAC,QAAQ,EAAE,CAAC,KAAK,CAAC,CAAC,CAAC;QAEpE,IAAI,MAAM,CAAC,IAAI,CAAC,MAAM,KAAK,CAAC,EAAE,CAAC;YAC7B,OAAO,IAAI,CAAC;QACd,CAAC;QAED,MAAM,GAAG,GAAG,MAAM,CAAC,IAAI,CAAC,CAAC,CAAW,CAAC;QACrC,OAAO,IAAI,iBAAW,CAAC,GAAG,EAAE,IAAI,CAAC,IAAI,CAAC,CAAC;IACzC,CAAC;IAED,KAAK,CAAC,KAAK;QACT,MAAM,IAAI,CAAC,IAAI,CAAC,GAAG,EAAE,CAAC;QACtB,yDAAyD;QACzD,MAAM,IAAI,OAAO,CAAC,CAAC,OAAO,EAAE,EAAE,CAAC,UAAU,CAAC,OAAO,EAAE,EAAE,CAAC,CAAC,CAAC;IAC1D,CAAC;CACF;AA5FD,wBA4FC"} \ No newline at end of file diff --git a/dist/types.d.ts b/dist/types.d.ts index 9fe4c03..a9925be 100644 --- a/dist/types.d.ts +++ b/dist/types.d.ts @@ -42,6 +42,7 @@ export interface WorkerOptions { queue?: string; interval?: number; maxAttempts?: number; + concurrency?: number; } export interface JobRow { priority: number; diff --git a/dist/types.d.ts.map b/dist/types.d.ts.map index 704f5b8..a8181f1 100644 --- a/dist/types.d.ts.map +++ b/dist/types.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"types.d.ts","sourceRoot":"","sources":["../src/types.ts"],"names":[],"mappings":"AACA,MAAM,MAAM,SAAS,GACjB,MAAM,GACN,MAAM,GACN,OAAO,GACP,IAAI,GACJ,UAAU,GACV,SAAS,CAAC;AAEd,MAAM,WAAW,UAAU;IACzB,CAAC,GAAG,EAAE,MAAM,GAAG,SAAS,CAAC;CAC1B;AAED,MAAM,WAAW,SAAU,SAAQ,KAAK,CAAC,SAAS,CAAC;CAAI;AAEvD,MAAM,WAAW,GAAG;IAClB,EAAE,EAAE,MAAM,CAAC;IACX,KAAK,EAAE,MAAM,CAAC;IACd,QAAQ,EAAE,MAAM,CAAC;IACjB,KAAK,EAAE,IAAI,CAAC;IACZ,QAAQ,EAAE,MAAM,CAAC;IACjB,IAAI,EAAE,SAAS,CAAC;IAChB,UAAU,EAAE,MAAM,CAAC;IACnB,SAAS,CAAC,EAAE,MAAM,CAAC;IAGnB,MAAM,IAAI,OAAO,CAAC,IAAI,CAAC,CAAC;IACxB,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC,CAAC;IACtB,KAAK,CAAC,YAAY,EAAE,MAAM,GAAG,OAAO,CAAC,IAAI,CAAC,CAAC;CAC5C;AAED,MAAM,WAAW,cAAc;IAC7B,QAAQ,CAAC,EAAE,MAAM,CAAC;IAClB,KAAK,CAAC,EAAE,IAAI,CAAC;IACb,KAAK,CAAC,EAAE,MAAM,CAAC;CAChB;AAED,MAAM,WAAW,YAAY;IAC3B,CAAC,GAAG,EAAE,GAAG,GAAG,OAAO,CAAC,IAAI,CAAC,CAAC;CAC3B;AAED,MAAM,WAAW,OAAO;IACtB,CAAC,QAAQ,EAAE,MAAM,GAAG,YAAY,CAAC;CAClC;AAED,MAAM,WAAW,YAAY;IAC3B,gBAAgB,CAAC,EAAE,MAAM,CAAC;IAC1B,IAAI,CAAC,EAAE,MAAM,CAAC;IACd,IAAI,CAAC,EAAE,MAAM,CAAC;IACd,QAAQ,CAAC,EAAE,MAAM,CAAC;IAClB,IAAI,CAAC,EAAE,MAAM,CAAC;IACd,QAAQ,CAAC,EAAE,MAAM,CAAC;IAClB,GAAG,CAAC,EAAE,OAAO,CAAC;IACd,cAAc,CAAC,EAAE,MAAM,CAAC;CACzB;AAED,MAAM,WAAW,aAAa;IAC5B,KAAK,CAAC,EAAE,MAAM,CAAC;IACf,QAAQ,CAAC,EAAE,MAAM,CAAC;IAClB,WAAW,CAAC,EAAE,MAAM,CAAC;CACtB;AAED,MAAM,WAAW,MAAM;IACrB,QAAQ,EAAE,MAAM,CAAC;IACjB,MAAM,EAAE,IAAI,CAAC;IACb,MAAM,EAAE,MAAM,CAAC;IACf,SAAS,EAAE,MAAM,CAAC;IAClB,IAAI,EAAE,SAAS,CAAC;IAChB,WAAW,EAAE,MAAM,CAAC;IACpB,UAAU,CAAC,EAAE,MAAM,GAAG,IAAI,CAAC;IAC3B,KAAK,EAAE,MAAM,CAAC;CACf"} \ No newline at end of file +{"version":3,"file":"types.d.ts","sourceRoot":"","sources":["../src/types.ts"],"names":[],"mappings":"AACA,MAAM,MAAM,SAAS,GACjB,MAAM,GACN,MAAM,GACN,OAAO,GACP,IAAI,GACJ,UAAU,GACV,SAAS,CAAC;AAEd,MAAM,WAAW,UAAU;IACzB,CAAC,GAAG,EAAE,MAAM,GAAG,SAAS,CAAC;CAC1B;AAED,MAAM,WAAW,SAAU,SAAQ,KAAK,CAAC,SAAS,CAAC;CAAG;AAEtD,MAAM,WAAW,GAAG;IAClB,EAAE,EAAE,MAAM,CAAC;IACX,KAAK,EAAE,MAAM,CAAC;IACd,QAAQ,EAAE,MAAM,CAAC;IACjB,KAAK,EAAE,IAAI,CAAC;IACZ,QAAQ,EAAE,MAAM,CAAC;IACjB,IAAI,EAAE,SAAS,CAAC;IAChB,UAAU,EAAE,MAAM,CAAC;IACnB,SAAS,CAAC,EAAE,MAAM,CAAC;IAGnB,MAAM,IAAI,OAAO,CAAC,IAAI,CAAC,CAAC;IACxB,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC,CAAC;IACtB,KAAK,CAAC,YAAY,EAAE,MAAM,GAAG,OAAO,CAAC,IAAI,CAAC,CAAC;CAC5C;AAED,MAAM,WAAW,cAAc;IAC7B,QAAQ,CAAC,EAAE,MAAM,CAAC;IAClB,KAAK,CAAC,EAAE,IAAI,CAAC;IACb,KAAK,CAAC,EAAE,MAAM,CAAC;CAChB;AAED,MAAM,WAAW,YAAY;IAC3B,CAAC,GAAG,EAAE,GAAG,GAAG,OAAO,CAAC,IAAI,CAAC,CAAC;CAC3B;AAED,MAAM,WAAW,OAAO;IACtB,CAAC,QAAQ,EAAE,MAAM,GAAG,YAAY,CAAC;CAClC;AAED,MAAM,WAAW,YAAY;IAC3B,gBAAgB,CAAC,EAAE,MAAM,CAAC;IAC1B,IAAI,CAAC,EAAE,MAAM,CAAC;IACd,IAAI,CAAC,EAAE,MAAM,CAAC;IACd,QAAQ,CAAC,EAAE,MAAM,CAAC;IAClB,IAAI,CAAC,EAAE,MAAM,CAAC;IACd,QAAQ,CAAC,EAAE,MAAM,CAAC;IAClB,GAAG,CAAC,EAAE,OAAO,CAAC;IACd,cAAc,CAAC,EAAE,MAAM,CAAC;CACzB;AAED,MAAM,WAAW,aAAa;IAC5B,KAAK,CAAC,EAAE,MAAM,CAAC;IACf,QAAQ,CAAC,EAAE,MAAM,CAAC;IAClB,WAAW,CAAC,EAAE,MAAM,CAAC;IACrB,WAAW,CAAC,EAAE,MAAM,CAAC;CACtB;AAED,MAAM,WAAW,MAAM;IACrB,QAAQ,EAAE,MAAM,CAAC;IACjB,MAAM,EAAE,IAAI,CAAC;IACb,MAAM,EAAE,MAAM,CAAC;IACf,SAAS,EAAE,MAAM,CAAC;IAClB,IAAI,EAAE,SAAS,CAAC;IAChB,WAAW,EAAE,MAAM,CAAC;IACpB,UAAU,CAAC,EAAE,MAAM,GAAG,IAAI,CAAC;IAC3B,KAAK,EAAE,MAAM,CAAC;CACf"} \ No newline at end of file diff --git a/dist/utils.d.ts b/dist/utils.d.ts index 8811b03..253057f 100644 --- a/dist/utils.d.ts +++ b/dist/utils.d.ts @@ -1,6 +1,8 @@ -import { JSONArray } from './types'; +import { JSONArray } from "./types"; +import { Pool } from "pg"; export declare function intPow(base: number, exponent: number): number; export declare function calculateRetryDelay(errorCount: number): number; export declare function formatJobArgs(args: JSONArray): string; export declare function parseJobArgs(args: JSONArray): JSONArray; +export declare function runMigrations(client: Pool): Promise; //# sourceMappingURL=utils.d.ts.map \ No newline at end of file diff --git a/dist/utils.d.ts.map b/dist/utils.d.ts.map index 840c035..7757e18 100644 --- a/dist/utils.d.ts.map +++ b/dist/utils.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"utils.d.ts","sourceRoot":"","sources":["../src/utils.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,SAAS,EAAa,MAAM,SAAS,CAAC;AAE/C,wBAAgB,MAAM,CAAC,IAAI,EAAE,MAAM,EAAE,QAAQ,EAAE,MAAM,GAAG,MAAM,CAW7D;AAED,wBAAgB,mBAAmB,CAAC,UAAU,EAAE,MAAM,GAAG,MAAM,CAE9D;AAED,wBAAgB,aAAa,CAAC,IAAI,EAAE,SAAS,GAAG,MAAM,CAErD;AAED,wBAAgB,YAAY,CAAC,IAAI,EAAE,SAAS,GAAG,SAAS,CAQvD"} \ No newline at end of file +{"version":3,"file":"utils.d.ts","sourceRoot":"","sources":["../src/utils.ts"],"names":[],"mappings":"AAEA,OAAO,EAAE,SAAS,EAAE,MAAM,SAAS,CAAC;AACpC,OAAO,EAAE,IAAI,EAAE,MAAM,IAAI,CAAC;AAE1B,wBAAgB,MAAM,CAAC,IAAI,EAAE,MAAM,EAAE,QAAQ,EAAE,MAAM,GAAG,MAAM,CAW7D;AAED,wBAAgB,mBAAmB,CAAC,UAAU,EAAE,MAAM,GAAG,MAAM,CAE9D;AAED,wBAAgB,aAAa,CAAC,IAAI,EAAE,SAAS,GAAG,MAAM,CAErD;AAED,wBAAgB,YAAY,CAAC,IAAI,EAAE,SAAS,GAAG,SAAS,CAUvD;AAED,wBAAsB,aAAa,CAAC,MAAM,EAAE,IAAI,GAAG,OAAO,CAAC,IAAI,CAAC,CA2B/D"} \ No newline at end of file diff --git a/dist/utils.js b/dist/utils.js index b15152b..e967523 100644 --- a/dist/utils.js +++ b/dist/utils.js @@ -1,9 +1,15 @@ "use strict"; +var __importDefault = (this && this.__importDefault) || function (mod) { + return (mod && mod.__esModule) ? mod : { "default": mod }; +}; Object.defineProperty(exports, "__esModule", { value: true }); exports.intPow = intPow; exports.calculateRetryDelay = calculateRetryDelay; exports.formatJobArgs = formatJobArgs; exports.parseJobArgs = parseJobArgs; +exports.runMigrations = runMigrations; +const fs_1 = __importDefault(require("fs")); +const path_1 = __importDefault(require("path")); function intPow(base, exponent) { if (exponent < 0) { return 0; @@ -28,4 +34,27 @@ function parseJobArgs(args) { } return args; } +async function runMigrations(client) { + // test if the client is connected + try { + await client.query("SELECT 1"); + } + catch (error) { + throw new Error("Client is not connected"); + } + // Check if the que_jobs table exists + const tableExistsQuery = ` + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = 'que_jobs' + ); + `; + const tableExistsResult = await client.query(tableExistsQuery); + const tableExists = tableExistsResult.rows[0].exists; + if (!tableExists) { + const migrations = fs_1.default.readFileSync(path_1.default.join(__dirname, "../migrations/schema.sql"), "utf8"); + await client.query(migrations); + } +} //# sourceMappingURL=utils.js.map \ No newline at end of file diff --git a/dist/utils.js.map b/dist/utils.js.map index 1bea1ee..4e3e068 100644 --- a/dist/utils.js.map +++ b/dist/utils.js.map @@ -1 +1 @@ -{"version":3,"file":"utils.js","sourceRoot":"","sources":["../src/utils.ts"],"names":[],"mappings":";;AAEA,wBAWC;AAED,kDAEC;AAED,sCAEC;AAED,oCAQC;AA7BD,SAAgB,MAAM,CAAC,IAAY,EAAE,QAAgB;IACnD,IAAI,QAAQ,GAAG,CAAC,EAAE,CAAC;QACjB,OAAO,CAAC,CAAC;IACX,CAAC;IAED,IAAI,MAAM,GAAG,CAAC,CAAC;IACf,KAAK,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,GAAG,QAAQ,EAAE,CAAC,EAAE,EAAE,CAAC;QAClC,MAAM,IAAI,IAAI,CAAC;IACjB,CAAC;IAED,OAAO,MAAM,CAAC;AAChB,CAAC;AAED,SAAgB,mBAAmB,CAAC,UAAkB;IACpD,OAAO,MAAM,CAAC,UAAU,EAAE,CAAC,CAAC,CAAC;AAC/B,CAAC;AAED,SAAgB,aAAa,CAAC,IAAe;IAC3C,OAAO,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,CAAC;AAC9B,CAAC;AAED,SAAgB,YAAY,CAAC,IAAe;IAC1C,4DAA4D;IAC5D,4CAA4C;IAC5C,IAAI,CAAC,KAAK,CAAC,OAAO,CAAC,IAAI,CAAC,EAAE,CAAC;QACzB,MAAM,IAAI,KAAK,CAAC,oDAAoD,OAAO,IAAI,EAAE,CAAC,CAAC;IACrF,CAAC;IAED,OAAO,IAAI,CAAC;AACd,CAAC"} \ No newline at end of file +{"version":3,"file":"utils.js","sourceRoot":"","sources":["../src/utils.ts"],"names":[],"mappings":";;;;;AAKA,wBAWC;AAED,kDAEC;AAED,sCAEC;AAED,oCAUC;AAED,sCA2BC;AAjED,4CAAoB;AACpB,gDAAwB;AAIxB,SAAgB,MAAM,CAAC,IAAY,EAAE,QAAgB;IACnD,IAAI,QAAQ,GAAG,CAAC,EAAE,CAAC;QACjB,OAAO,CAAC,CAAC;IACX,CAAC;IAED,IAAI,MAAM,GAAG,CAAC,CAAC;IACf,KAAK,IAAI,CAAC,GAAG,CAAC,EAAE,CAAC,GAAG,QAAQ,EAAE,CAAC,EAAE,EAAE,CAAC;QAClC,MAAM,IAAI,IAAI,CAAC;IACjB,CAAC;IAED,OAAO,MAAM,CAAC;AAChB,CAAC;AAED,SAAgB,mBAAmB,CAAC,UAAkB;IACpD,OAAO,MAAM,CAAC,UAAU,EAAE,CAAC,CAAC,CAAC;AAC/B,CAAC;AAED,SAAgB,aAAa,CAAC,IAAe;IAC3C,OAAO,IAAI,CAAC,SAAS,CAAC,IAAI,CAAC,CAAC;AAC9B,CAAC;AAED,SAAgB,YAAY,CAAC,IAAe;IAC1C,4DAA4D;IAC5D,4CAA4C;IAC5C,IAAI,CAAC,KAAK,CAAC,OAAO,CAAC,IAAI,CAAC,EAAE,CAAC;QACzB,MAAM,IAAI,KAAK,CACb,oDAAoD,OAAO,IAAI,EAAE,CAClE,CAAC;IACJ,CAAC;IAED,OAAO,IAAI,CAAC;AACd,CAAC;AAEM,KAAK,UAAU,aAAa,CAAC,MAAY;IAC9C,kCAAkC;IAClC,IAAI,CAAC;QACH,MAAM,MAAM,CAAC,KAAK,CAAC,UAAU,CAAC,CAAC;IACjC,CAAC;IAAC,OAAO,KAAK,EAAE,CAAC;QACf,MAAM,IAAI,KAAK,CAAC,yBAAyB,CAAC,CAAC;IAC7C,CAAC;IAED,qCAAqC;IACrC,MAAM,gBAAgB,GAAG;;;;;;GAMxB,CAAC;IAEF,MAAM,iBAAiB,GAAG,MAAM,MAAM,CAAC,KAAK,CAAC,gBAAgB,CAAC,CAAC;IAC/D,MAAM,WAAW,GAAG,iBAAiB,CAAC,IAAI,CAAC,CAAC,CAAC,CAAC,MAAM,CAAC;IAErD,IAAI,CAAC,WAAW,EAAE,CAAC;QACjB,MAAM,UAAU,GAAG,YAAE,CAAC,YAAY,CAChC,cAAI,CAAC,IAAI,CAAC,SAAS,EAAE,0BAA0B,CAAC,EAChD,MAAM,CACP,CAAC;QACF,MAAM,MAAM,CAAC,KAAK,CAAC,UAAU,CAAC,CAAC;IACjC,CAAC;AACH,CAAC"} \ No newline at end of file diff --git a/dist/worker.d.ts.map b/dist/worker.d.ts.map index 8112c3e..e761ff9 100644 --- a/dist/worker.d.ts.map +++ b/dist/worker.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"worker.d.ts","sourceRoot":"","sources":["../src/worker.ts"],"names":[],"mappings":"AACA,OAAO,EAAO,YAAY,EAAW,aAAa,EAAE,YAAY,EAAE,MAAM,SAAS,CAAC;AAElF,qBAAa,MAAM;IACjB,OAAO,CAAC,MAAM,CAAS;IACvB,OAAO,CAAC,OAAO,CAAe;IAC9B,OAAO,CAAC,KAAK,CAAS;IACtB,OAAO,CAAC,QAAQ,CAAS;IACzB,OAAO,CAAC,OAAO,CAAkB;IACjC,OAAO,CAAC,eAAe,CAA8B;IACrD,OAAO,CAAC,SAAS,CAA+B;IAChD,OAAO,CAAC,cAAc,CAA6B;gBAEvC,YAAY,GAAE,YAAiB,EAAE,OAAO,GAAE,aAAkB;IAMxE,QAAQ,CAAC,QAAQ,EAAE,MAAM,EAAE,QAAQ,EAAE,YAAY,GAAG,IAAI;IAIlD,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC;IAUrB,OAAO,IAAI,OAAO,CAAC,OAAO,CAAC;IAW3B,QAAQ,IAAI,OAAO,CAAC,IAAI,CAAC;YAyBjB,QAAQ;YA8BR,UAAU;CAgBzB"} \ No newline at end of file +{"version":3,"file":"worker.d.ts","sourceRoot":"","sources":["../src/worker.ts"],"names":[],"mappings":"AACA,OAAO,EAAO,YAAY,EAAW,aAAa,EAAE,YAAY,EAAE,MAAM,SAAS,CAAC;AAElF,qBAAa,MAAM;IACjB,OAAO,CAAC,MAAM,CAAS;IACvB,OAAO,CAAC,OAAO,CAAe;IAC9B,OAAO,CAAC,KAAK,CAAS;IACtB,OAAO,CAAC,QAAQ,CAAS;IACzB,OAAO,CAAC,OAAO,CAAkB;IACjC,OAAO,CAAC,eAAe,CAA8B;IACrD,OAAO,CAAC,SAAS,CAA+B;IAChD,OAAO,CAAC,cAAc,CAA6B;gBAEvC,YAAY,GAAE,YAAiB,EAAE,OAAO,GAAE,aAAkB;IAMxE,QAAQ,CAAC,QAAQ,EAAE,MAAM,EAAE,QAAQ,EAAE,YAAY,GAAG,IAAI;IAIlD,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC;IAUrB,OAAO,IAAI,OAAO,CAAC,OAAO,CAAC;IAW3B,QAAQ,IAAI,OAAO,CAAC,IAAI,CAAC;YAyBjB,QAAQ;YAkCR,UAAU;CAgBzB"} \ No newline at end of file diff --git a/dist/worker.js b/dist/worker.js index e7f97c5..17fc734 100644 --- a/dist/worker.js +++ b/dist/worker.js @@ -62,6 +62,8 @@ class Worker { this.timeoutResolve = null; resolve(); }, this.interval); + // Allow process to exit even if timer is active + this.timeoutId.unref(); }); } } @@ -74,6 +76,8 @@ class Worker { this.timeoutResolve = null; resolve(); }, this.interval); + // Allow process to exit even if timer is active + this.timeoutId.unref(); }); } } diff --git a/dist/worker.js.map b/dist/worker.js.map index f61c074..825f3fa 100644 --- a/dist/worker.js.map +++ b/dist/worker.js.map @@ -1 +1 @@ -{"version":3,"file":"worker.js","sourceRoot":"","sources":["../src/worker.ts"],"names":[],"mappings":";;;AAAA,qCAAkC;AAGlC,MAAa,MAAM;IAUjB,YAAY,eAA6B,EAAE,EAAE,UAAyB,EAAE;QARhE,YAAO,GAAY,EAAE,CAAC;QAGtB,YAAO,GAAY,KAAK,CAAC;QACzB,oBAAe,GAAyB,IAAI,CAAC;QAC7C,cAAS,GAA0B,IAAI,CAAC;QACxC,mBAAc,GAAwB,IAAI,CAAC;QAGjD,IAAI,CAAC,MAAM,GAAG,IAAI,eAAM,CAAC,YAAY,CAAC,CAAC;QACvC,IAAI,CAAC,KAAK,GAAG,OAAO,CAAC,KAAK,IAAI,EAAE,CAAC;QACjC,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,QAAQ,IAAI,EAAE,GAAG,IAAI,CAAC;IAChD,CAAC;IAED,QAAQ,CAAC,QAAgB,EAAE,QAAsB;QAC/C,IAAI,CAAC,OAAO,CAAC,QAAQ,CAAC,GAAG,QAAQ,CAAC;IACpC,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI,IAAI,CAAC,OAAO,EAAE,CAAC;YACjB,MAAM,IAAI,KAAK,CAAC,2BAA2B,CAAC,CAAC;QAC/C,CAAC;QAED,IAAI,CAAC,OAAO,GAAG,IAAI,CAAC;QACpB,IAAI,CAAC,eAAe,GAAG,IAAI,CAAC,QAAQ,EAAE,CAAC;QACvC,OAAO,IAAI,CAAC,eAAe,CAAC;IAC9B,CAAC;IAED,KAAK,CAAC,OAAO;QACX,MAAM,GAAG,GAAG,MAAM,IAAI,CAAC,MAAM,CAAC,OAAO,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;QAElD,IAAI,CAAC,GAAG,EAAE,CAAC;YACT,OAAO,KAAK,CAAC;QACf,CAAC;QAED,MAAM,IAAI,CAAC,UAAU,CAAC,GAAG,CAAC,CAAC;QAC3B,OAAO,IAAI,CAAC;IACd,CAAC;IAED,KAAK,CAAC,QAAQ;QACZ,IAAI,CAAC,IAAI,CAAC,OAAO,EAAE,CAAC;YAClB,OAAO;QACT,CAAC;QAED,IAAI,CAAC,OAAO,GAAG,KAAK,CAAC;QAErB,IAAI,IAAI,CAAC,SAAS,EAAE,CAAC;YACnB,YAAY,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;YAC7B,IAAI,CAAC,SAAS,GAAG,IAAI,CAAC;QACxB,CAAC;QAED,uCAAuC;QACvC,IAAI,IAAI,CAAC,cAAc,EAAE,CAAC;YACxB,IAAI,CAAC,cAAc,EAAE,CAAC;YACtB,IAAI,CAAC,cAAc,GAAG,IAAI,CAAC;QAC7B,CAAC;QAED,IAAI,IAAI,CAAC,eAAe,EAAE,CAAC;YACzB,MAAM,IAAI,CAAC,eAAe,CAAC;QAC7B,CAAC;QAED,MAAM,IAAI,CAAC,MAAM,CAAC,KAAK,EAAE,CAAC;IAC5B,CAAC;IAEO,KAAK,CAAC,QAAQ;QACpB,OAAO,IAAI,CAAC,OAAO,EAAE,CAAC;YACpB,IAAI,CAAC;gBACH,MAAM,SAAS,GAAG,MAAM,IAAI,CAAC,OAAO,EAAE,CAAC;gBAEvC,IAAI,CAAC,SAAS,IAAI,IAAI,CAAC,OAAO,EAAE,CAAC;oBAC/B,MAAM,IAAI,OAAO,CAAO,CAAC,OAAO,EAAE,EAAE;wBAClC,IAAI,CAAC,cAAc,GAAG,OAAO,CAAC;wBAC9B,IAAI,CAAC,SAAS,GAAG,UAAU,CAAC,GAAG,EAAE;4BAC/B,IAAI,CAAC,cAAc,GAAG,IAAI,CAAC;4BAC3B,OAAO,EAAE,CAAC;wBACZ,CAAC,EAAE,IAAI,CAAC,QAAQ,CAAC,CAAC;oBACpB,CAAC,CAAC,CAAC;gBACL,CAAC;YACH,CAAC;YAAC,OAAO,KAAK,EAAE,CAAC;gBACf,OAAO,CAAC,KAAK,CAAC,eAAe,EAAE,KAAK,CAAC,CAAC;gBAEtC,IAAI,IAAI,CAAC,OAAO,EAAE,CAAC;oBACjB,MAAM,IAAI,OAAO,CAAO,CAAC,OAAO,EAAE,EAAE;wBAClC,IAAI,CAAC,cAAc,GAAG,OAAO,CAAC;wBAC9B,IAAI,CAAC,SAAS,GAAG,UAAU,CAAC,GAAG,EAAE;4BAC/B,IAAI,CAAC,cAAc,GAAG,IAAI,CAAC;4BAC3B,OAAO,EAAE,CAAC;wBACZ,CAAC,EAAE,IAAI,CAAC,QAAQ,CAAC,CAAC;oBACpB,CAAC,CAAC,CAAC;gBACL,CAAC;YACH,CAAC;QACH,CAAC;IACH,CAAC;IAEO,KAAK,CAAC,UAAU,CAAC,GAAQ;QAC/B,MAAM,QAAQ,GAAG,IAAI,CAAC,OAAO,CAAC,GAAG,CAAC,QAAQ,CAAC,CAAC;QAE5C,IAAI,CAAC,QAAQ,EAAE,CAAC;YACd,MAAM,GAAG,CAAC,KAAK,CAAC,8CAA8C,GAAG,CAAC,QAAQ,EAAE,CAAC,CAAC;YAC9E,OAAO;QACT,CAAC;QAED,IAAI,CAAC;YACH,MAAM,QAAQ,CAAC,GAAG,CAAC,CAAC;YACpB,MAAM,GAAG,CAAC,IAAI,EAAE,CAAC;QACnB,CAAC;QAAC,OAAO,KAAK,EAAE,CAAC;YACf,MAAM,YAAY,GAAG,KAAK,YAAY,KAAK,CAAC,CAAC,CAAC,KAAK,CAAC,OAAO,CAAC,CAAC,CAAC,MAAM,CAAC,KAAK,CAAC,CAAC;YAC5E,MAAM,GAAG,CAAC,KAAK,CAAC,YAAY,CAAC,CAAC;QAChC,CAAC;IACH,CAAC;CACF;AAhHD,wBAgHC"} \ No newline at end of file +{"version":3,"file":"worker.js","sourceRoot":"","sources":["../src/worker.ts"],"names":[],"mappings":";;;AAAA,qCAAkC;AAGlC,MAAa,MAAM;IAUjB,YAAY,eAA6B,EAAE,EAAE,UAAyB,EAAE;QARhE,YAAO,GAAY,EAAE,CAAC;QAGtB,YAAO,GAAY,KAAK,CAAC;QACzB,oBAAe,GAAyB,IAAI,CAAC;QAC7C,cAAS,GAA0B,IAAI,CAAC;QACxC,mBAAc,GAAwB,IAAI,CAAC;QAGjD,IAAI,CAAC,MAAM,GAAG,IAAI,eAAM,CAAC,YAAY,CAAC,CAAC;QACvC,IAAI,CAAC,KAAK,GAAG,OAAO,CAAC,KAAK,IAAI,EAAE,CAAC;QACjC,IAAI,CAAC,QAAQ,GAAG,OAAO,CAAC,QAAQ,IAAI,EAAE,GAAG,IAAI,CAAC;IAChD,CAAC;IAED,QAAQ,CAAC,QAAgB,EAAE,QAAsB;QAC/C,IAAI,CAAC,OAAO,CAAC,QAAQ,CAAC,GAAG,QAAQ,CAAC;IACpC,CAAC;IAED,KAAK,CAAC,IAAI;QACR,IAAI,IAAI,CAAC,OAAO,EAAE,CAAC;YACjB,MAAM,IAAI,KAAK,CAAC,2BAA2B,CAAC,CAAC;QAC/C,CAAC;QAED,IAAI,CAAC,OAAO,GAAG,IAAI,CAAC;QACpB,IAAI,CAAC,eAAe,GAAG,IAAI,CAAC,QAAQ,EAAE,CAAC;QACvC,OAAO,IAAI,CAAC,eAAe,CAAC;IAC9B,CAAC;IAED,KAAK,CAAC,OAAO;QACX,MAAM,GAAG,GAAG,MAAM,IAAI,CAAC,MAAM,CAAC,OAAO,CAAC,IAAI,CAAC,KAAK,CAAC,CAAC;QAElD,IAAI,CAAC,GAAG,EAAE,CAAC;YACT,OAAO,KAAK,CAAC;QACf,CAAC;QAED,MAAM,IAAI,CAAC,UAAU,CAAC,GAAG,CAAC,CAAC;QAC3B,OAAO,IAAI,CAAC;IACd,CAAC;IAED,KAAK,CAAC,QAAQ;QACZ,IAAI,CAAC,IAAI,CAAC,OAAO,EAAE,CAAC;YAClB,OAAO;QACT,CAAC;QAED,IAAI,CAAC,OAAO,GAAG,KAAK,CAAC;QAErB,IAAI,IAAI,CAAC,SAAS,EAAE,CAAC;YACnB,YAAY,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;YAC7B,IAAI,CAAC,SAAS,GAAG,IAAI,CAAC;QACxB,CAAC;QAED,uCAAuC;QACvC,IAAI,IAAI,CAAC,cAAc,EAAE,CAAC;YACxB,IAAI,CAAC,cAAc,EAAE,CAAC;YACtB,IAAI,CAAC,cAAc,GAAG,IAAI,CAAC;QAC7B,CAAC;QAED,IAAI,IAAI,CAAC,eAAe,EAAE,CAAC;YACzB,MAAM,IAAI,CAAC,eAAe,CAAC;QAC7B,CAAC;QAED,MAAM,IAAI,CAAC,MAAM,CAAC,KAAK,EAAE,CAAC;IAC5B,CAAC;IAEO,KAAK,CAAC,QAAQ;QACpB,OAAO,IAAI,CAAC,OAAO,EAAE,CAAC;YACpB,IAAI,CAAC;gBACH,MAAM,SAAS,GAAG,MAAM,IAAI,CAAC,OAAO,EAAE,CAAC;gBAEvC,IAAI,CAAC,SAAS,IAAI,IAAI,CAAC,OAAO,EAAE,CAAC;oBAC/B,MAAM,IAAI,OAAO,CAAO,CAAC,OAAO,EAAE,EAAE;wBAClC,IAAI,CAAC,cAAc,GAAG,OAAO,CAAC;wBAC9B,IAAI,CAAC,SAAS,GAAG,UAAU,CAAC,GAAG,EAAE;4BAC/B,IAAI,CAAC,cAAc,GAAG,IAAI,CAAC;4BAC3B,OAAO,EAAE,CAAC;wBACZ,CAAC,EAAE,IAAI,CAAC,QAAQ,CAAC,CAAC;wBAClB,gDAAgD;wBAChD,IAAI,CAAC,SAAS,CAAC,KAAK,EAAE,CAAC;oBACzB,CAAC,CAAC,CAAC;gBACL,CAAC;YACH,CAAC;YAAC,OAAO,KAAK,EAAE,CAAC;gBACf,OAAO,CAAC,KAAK,CAAC,eAAe,EAAE,KAAK,CAAC,CAAC;gBAEtC,IAAI,IAAI,CAAC,OAAO,EAAE,CAAC;oBACjB,MAAM,IAAI,OAAO,CAAO,CAAC,OAAO,EAAE,EAAE;wBAClC,IAAI,CAAC,cAAc,GAAG,OAAO,CAAC;wBAC9B,IAAI,CAAC,SAAS,GAAG,UAAU,CAAC,GAAG,EAAE;4BAC/B,IAAI,CAAC,cAAc,GAAG,IAAI,CAAC;4BAC3B,OAAO,EAAE,CAAC;wBACZ,CAAC,EAAE,IAAI,CAAC,QAAQ,CAAC,CAAC;wBAClB,gDAAgD;wBAChD,IAAI,CAAC,SAAS,CAAC,KAAK,EAAE,CAAC;oBACzB,CAAC,CAAC,CAAC;gBACL,CAAC;YACH,CAAC;QACH,CAAC;IACH,CAAC;IAEO,KAAK,CAAC,UAAU,CAAC,GAAQ;QAC/B,MAAM,QAAQ,GAAG,IAAI,CAAC,OAAO,CAAC,GAAG,CAAC,QAAQ,CAAC,CAAC;QAE5C,IAAI,CAAC,QAAQ,EAAE,CAAC;YACd,MAAM,GAAG,CAAC,KAAK,CAAC,8CAA8C,GAAG,CAAC,QAAQ,EAAE,CAAC,CAAC;YAC9E,OAAO;QACT,CAAC;QAED,IAAI,CAAC;YACH,MAAM,QAAQ,CAAC,GAAG,CAAC,CAAC;YACpB,MAAM,GAAG,CAAC,IAAI,EAAE,CAAC;QACnB,CAAC;QAAC,OAAO,KAAK,EAAE,CAAC;YACf,MAAM,YAAY,GAAG,KAAK,YAAY,KAAK,CAAC,CAAC,CAAC,KAAK,CAAC,OAAO,CAAC,CAAC,CAAC,MAAM,CAAC,KAAK,CAAC,CAAC;YAC5E,MAAM,GAAG,CAAC,KAAK,CAAC,YAAY,CAAC,CAAC;QAChC,CAAC;IACH,CAAC;CACF;AApHD,wBAoHC"} \ No newline at end of file From 68c8aba20ac5c06eb1bf8d6c2ab5f8ee9601c4d9 Mon Sep 17 00:00:00 2001 From: Quajo Duke Date: Tue, 16 Sep 2025 15:14:50 +0000 Subject: [PATCH 4/5] chore: added load test and code clean up --- tests/load.test.ts | 562 +++++++++++++++++++++++++++++++++++++++++++++ tests/setup.ts | 1 - 2 files changed, 562 insertions(+), 1 deletion(-) create mode 100644 tests/load.test.ts diff --git a/tests/load.test.ts b/tests/load.test.ts new file mode 100644 index 0000000..692d1c8 --- /dev/null +++ b/tests/load.test.ts @@ -0,0 +1,562 @@ +import { Worker } from "../src/worker"; +import { Client } from "../src/client"; +import { + setupTestDatabase, + cleanupTestDatabase, + TEST_DB_CONFIG, +} from "./setup"; +import { Pool } from "pg"; + +// Performance measurement utilities +class PerformanceMonitor { + private startTime: number = 0; + private endTime: number = 0; + private memoryStart: NodeJS.MemoryUsage = { + rss: 0, + heapTotal: 0, + heapUsed: 0, + external: 0, + arrayBuffers: 0, + }; + + start(): void { + this.startTime = Date.now(); + this.memoryStart = process.memoryUsage(); + } + + end(): { + duration: number; + memoryDelta: NodeJS.MemoryUsage; + throughput: number; + } { + this.endTime = Date.now(); + const duration = this.endTime - this.startTime; + const memoryEnd = process.memoryUsage(); + const memoryDelta = { + rss: memoryEnd.rss - this.memoryStart.rss, + heapTotal: memoryEnd.heapTotal - this.memoryStart.heapTotal, + heapUsed: memoryEnd.heapUsed - this.memoryStart.heapUsed, + external: memoryEnd.external - this.memoryStart.external, + arrayBuffers: memoryEnd.arrayBuffers - this.memoryStart.arrayBuffers, + }; + + return { + duration, + memoryDelta, + throughput: 0, // Will be calculated by caller + }; + } +} + +// Test job implementations +const createTestJob = (duration: number = 0) => { + return async (job: any) => { + if (duration > 0) { + await new Promise((resolve) => setTimeout(resolve, duration)); + } + // Simulate some work + const result = job.args.reduce((sum: number, arg: number) => sum + arg, 0); + return result; + }; +}; + +const createFailingJob = (failureRate: number = 0.1) => { + return async (job: any) => { + if (Math.random() < failureRate) { + throw new Error(`Simulated failure for job ${job.id}`); + } + return job.args; + }; +}; + +describe("Load Tests", () => { + let pool: Pool; + let client: Client; + let workers: Worker[] = []; + + beforeAll(async () => { + pool = await setupTestDatabase(); + }); + + beforeEach(async () => { + client = new Client(TEST_DB_CONFIG); + await pool.query("TRUNCATE que_jobs"); + // Small delay to ensure TRUNCATE completes + await new Promise((resolve) => setTimeout(resolve, 50)); + }); + + afterEach(async () => { + // Shutdown all workers + await Promise.all(workers.map((worker) => worker.shutdown())); + workers = []; + await client.close(); + // Clean up any advisory locks + await pool.query("SELECT pg_advisory_unlock_all()"); + // Small delay to ensure connections are fully closed + await new Promise((resolve) => setTimeout(resolve, 100)); + }); + + afterAll(async () => { + await cleanupTestDatabase(pool); + }); + + describe("Job Enqueue Throughput", () => { + it("should measure enqueue throughput for different batch sizes", async () => { + const batchSizes = [100, 500, 1000, 2000]; + const results: Array<{ + batchSize: number; + throughput: number; + duration: number; + }> = []; + + for (const batchSize of batchSizes) { + const monitor = new PerformanceMonitor(); + monitor.start(); + + // Enqueue jobs in parallel + const enqueuePromises = Array.from({ length: batchSize }, (_, i) => + client.enqueue("TestJob", [i, i * 2]) + ); + + await Promise.all(enqueuePromises); + + const metrics = monitor.end(); + const throughput = batchSize / (metrics.duration / 1000); // jobs per second + + results.push({ + batchSize, + throughput, + duration: metrics.duration, + }); + + console.log( + `Batch size ${batchSize}: ${throughput.toFixed(2)} jobs/sec (${ + metrics.duration + }ms)` + ); + } + + // Verify all jobs were enqueued + const totalJobs = await pool.query( + "SELECT COUNT(*) as count FROM que_jobs" + ); + expect(parseInt(totalJobs.rows[0].count)).toBe( + batchSizes.reduce((sum, size) => sum + size, 0) + ); + + // Log results for analysis + console.log("\nEnqueue Throughput Results:"); + results.forEach((result) => { + console.log( + ` ${result.batchSize} jobs: ${result.throughput.toFixed(2)} jobs/sec` + ); + }); + }, 30000); + + it("should measure enqueue throughput with different job complexities", async () => { + const jobCount = 1000; + const testCases = [ + { name: "Simple", args: [1, 2, 3] }, + { name: "Medium", args: Array.from({ length: 10 }, (_, i) => i) }, + { + name: "Complex", + args: Array.from({ length: 100 }, (_, i) => ({ + id: i, + data: `item-${i}`, + })), + }, + ]; + + for (const testCase of testCases) { + const monitor = new PerformanceMonitor(); + monitor.start(); + + const enqueuePromises = Array.from({ length: jobCount }, (_, i) => + client.enqueue("TestJob", testCase.args) + ); + + await Promise.all(enqueuePromises); + + const metrics = monitor.end(); + const throughput = jobCount / (metrics.duration / 1000); + + console.log( + `${testCase.name} jobs: ${throughput.toFixed(2)} jobs/sec (${ + metrics.duration + }ms)` + ); + } + }, 20000); + }); + + describe("Worker Processing Capacity", () => { + it("should measure single worker processing throughput", async () => { + const jobCount = 1000; + const worker = new Worker(TEST_DB_CONFIG); + workers.push(worker); + + // Register a simple job handler + worker.register("TestJob", createTestJob(0)); + + // Enqueue jobs + const enqueuePromises = Array.from({ length: jobCount }, (_, i) => + client.enqueue("TestJob", [i]) + ); + await Promise.all(enqueuePromises); + + const monitor = new PerformanceMonitor(); + monitor.start(); + + // Start worker and wait for all jobs to be processed + const workPromise = worker.work(); + + // Poll until all jobs are processed + let processedCount = 0; + while (processedCount < jobCount) { + await new Promise((resolve) => setTimeout(resolve, 100)); + const result = await pool.query( + "SELECT COUNT(*) as count FROM que_jobs" + ); + processedCount = jobCount - parseInt(result.rows[0].count); + } + + const metrics = monitor.end(); + const throughput = jobCount / (metrics.duration / 1000); + + console.log( + `Single worker throughput: ${throughput.toFixed(2)} jobs/sec (${ + metrics.duration + }ms)` + ); + + // Shutdown worker + await worker.shutdown(); + workers = []; + + expect(processedCount).toBe(jobCount); + }, 30000); + + it("should measure worker throughput with different job durations", async () => { + const jobCount = 100; + const jobDurations = [0, 10, 50, 100]; // milliseconds + + for (const duration of jobDurations) { + const worker = new Worker(TEST_DB_CONFIG); + workers.push(worker); + + worker.register("TestJob", createTestJob(duration)); + + // Enqueue jobs + const enqueuePromises = Array.from({ length: jobCount }, (_, i) => + client.enqueue("TestJob", [i]) + ); + await Promise.all(enqueuePromises); + + const monitor = new PerformanceMonitor(); + monitor.start(); + + const workPromise = worker.work(); + + // Wait for all jobs to be processed + let processedCount = 0; + while (processedCount < jobCount) { + await new Promise((resolve) => setTimeout(resolve, 100)); + const result = await pool.query( + "SELECT COUNT(*) as count FROM que_jobs" + ); + processedCount = jobCount - parseInt(result.rows[0].count); + } + + const metrics = monitor.end(); + const throughput = jobCount / (metrics.duration / 1000); + + console.log( + `Job duration ${duration}ms: ${throughput.toFixed(2)} jobs/sec (${ + metrics.duration + }ms)` + ); + + await worker.shutdown(); + workers = []; + } + }, 60000); + }); + + describe("Concurrent Worker Scaling", () => { + it("should measure throughput with multiple concurrent workers", async () => { + const jobCount = 2000; + const workerCounts = [1, 2, 4, 8]; + + for (const workerCount of workerCounts) { + // Create workers + const currentWorkers = Array.from({ length: workerCount }, () => { + const worker = new Worker(TEST_DB_CONFIG); + worker.register("TestJob", createTestJob(0)); + return worker; + }); + workers.push(...currentWorkers); + + // Enqueue jobs + const enqueuePromises = Array.from({ length: jobCount }, (_, i) => + client.enqueue("TestJob", [i]) + ); + await Promise.all(enqueuePromises); + + const monitor = new PerformanceMonitor(); + monitor.start(); + + // Start all workers + const workPromises = currentWorkers.map((worker) => worker.work()); + + // Wait for all jobs to be processed + let processedCount = 0; + while (processedCount < jobCount) { + await new Promise((resolve) => setTimeout(resolve, 100)); + const result = await pool.query( + "SELECT COUNT(*) as count FROM que_jobs" + ); + processedCount = jobCount - parseInt(result.rows[0].count); + } + + const metrics = monitor.end(); + const throughput = jobCount / (metrics.duration / 1000); + + console.log( + `${workerCount} workers: ${throughput.toFixed(2)} jobs/sec (${ + metrics.duration + }ms)` + ); + + // Shutdown workers + await Promise.all(currentWorkers.map((worker) => worker.shutdown())); + workers = []; + } + }, 120000); + + it("should measure worker efficiency with different queue distributions", async () => { + const jobCount = 1000; + const queueCounts = [1, 2, 4, 8]; + + for (const queueCount of queueCounts) { + // Create workers for each queue + const currentWorkers = Array.from({ length: queueCount }, (_, i) => { + const worker = new Worker(TEST_DB_CONFIG, { queue: `queue-${i}` }); + worker.register("TestJob", createTestJob(0)); + return worker; + }); + workers.push(...currentWorkers); + + // Enqueue jobs across different queues + const enqueuePromises = Array.from({ length: jobCount }, (_, i) => + client.enqueue("TestJob", [i], { queue: `queue-${i % queueCount}` }) + ); + await Promise.all(enqueuePromises); + + const monitor = new PerformanceMonitor(); + monitor.start(); + + // Start all workers + const workPromises = currentWorkers.map((worker) => worker.work()); + + // Wait for all jobs to be processed + let processedCount = 0; + while (processedCount < jobCount) { + await new Promise((resolve) => setTimeout(resolve, 100)); + const result = await pool.query( + "SELECT COUNT(*) as count FROM que_jobs" + ); + processedCount = jobCount - parseInt(result.rows[0].count); + } + + const metrics = monitor.end(); + const throughput = jobCount / (metrics.duration / 1000); + + console.log( + `${queueCount} queues: ${throughput.toFixed(2)} jobs/sec (${ + metrics.duration + }ms)` + ); + + // Shutdown workers + await Promise.all(currentWorkers.map((worker) => worker.shutdown())); + workers = []; + } + }, 120000); + }); + + describe("Error Handling and Recovery", () => { + it("should measure throughput with job failures", async () => { + const jobCount = 1000; + const failureRates = [0, 0.1, 0.2, 0.5]; + + for (const failureRate of failureRates) { + const worker = new Worker(TEST_DB_CONFIG); + workers.push(worker); + + worker.register("TestJob", createFailingJob(failureRate)); + + // Enqueue jobs + const enqueuePromises = Array.from({ length: jobCount }, (_, i) => + client.enqueue("TestJob", [i]) + ); + await Promise.all(enqueuePromises); + + const monitor = new PerformanceMonitor(); + monitor.start(); + + const workPromise = worker.work(); + + // Wait for all jobs to be processed (including retries) + let processedCount = 0; + let maxWaitTime = 30000; // 30 seconds max + const startTime = Date.now(); + + while ( + processedCount < jobCount && + Date.now() - startTime < maxWaitTime + ) { + await new Promise((resolve) => setTimeout(resolve, 100)); + const result = await pool.query( + "SELECT COUNT(*) as count FROM que_jobs" + ); + processedCount = jobCount - parseInt(result.rows[0].count); + } + + const metrics = monitor.end(); + const throughput = jobCount / (metrics.duration / 1000); + + // Check error distribution + const errorResult = await pool.query( + "SELECT error_count, COUNT(*) as count FROM que_jobs GROUP BY error_count" + ); + const errorDistribution = errorResult.rows.reduce((acc, row) => { + acc[row.error_count] = parseInt(row.count); + return acc; + }, {} as Record); + + console.log( + `Failure rate ${failureRate}: ${throughput.toFixed(2)} jobs/sec (${ + metrics.duration + }ms)` + ); + console.log(` Error distribution:`, errorDistribution); + + await worker.shutdown(); + workers = []; + } + }, 180000); + }); + + describe("Memory and Resource Usage", () => { + it("should monitor memory usage during high load", async () => { + const jobCount = 5000; + const worker = new Worker(TEST_DB_CONFIG); + workers.push(worker); + + worker.register("TestJob", createTestJob(0)); + + const initialMemory = process.memoryUsage(); + console.log("Initial memory:", { + rss: Math.round(initialMemory.rss / 1024 / 1024), + heapUsed: Math.round(initialMemory.heapUsed / 1024 / 1024), + heapTotal: Math.round(initialMemory.heapTotal / 1024 / 1024), + }); + + // Enqueue jobs in batches to monitor memory growth + const batchSize = 1000; + for (let i = 0; i < jobCount; i += batchSize) { + const batchPromises = Array.from( + { length: Math.min(batchSize, jobCount - i) }, + (_, j) => client.enqueue("TestJob", [i + j]) + ); + await Promise.all(batchPromises); + + const currentMemory = process.memoryUsage(); + console.log(`After enqueuing ${i + batchSize} jobs:`, { + rss: Math.round(currentMemory.rss / 1024 / 1024), + heapUsed: Math.round(currentMemory.heapUsed / 1024 / 1024), + heapTotal: Math.round(currentMemory.heapTotal / 1024 / 1024), + }); + } + + // Process all jobs + const workPromise = worker.work(); + + let processedCount = 0; + while (processedCount < jobCount) { + await new Promise((resolve) => setTimeout(resolve, 100)); + const result = await pool.query( + "SELECT COUNT(*) as count FROM que_jobs" + ); + processedCount = jobCount - parseInt(result.rows[0].count); + } + + const finalMemory = process.memoryUsage(); + console.log("Final memory:", { + rss: Math.round(finalMemory.rss / 1024 / 1024), + heapUsed: Math.round(finalMemory.heapUsed / 1024 / 1024), + heapTotal: Math.round(finalMemory.heapTotal / 1024 / 1024), + }); + + await worker.shutdown(); + workers = []; + + expect(processedCount).toBe(jobCount); + }, 60000); + }); + + describe("Latency Measurements", () => { + it("should measure job processing latency distribution", async () => { + const jobCount = 1000; + const worker = new Worker(TEST_DB_CONFIG); + workers.push(worker); + + const latencies: number[] = []; + + worker.register("TestJob", async (job) => { + const startTime = Date.now(); + // Simulate some work + await new Promise((resolve) => setTimeout(resolve, Math.random() * 50)); + const endTime = Date.now(); + latencies.push(endTime - startTime); + }); + + // Enqueue jobs + const enqueuePromises = Array.from({ length: jobCount }, (_, i) => + client.enqueue("TestJob", [i]) + ); + await Promise.all(enqueuePromises); + + const workPromise = worker.work(); + + // Wait for all jobs to be processed + let processedCount = 0; + while (processedCount < jobCount) { + await new Promise((resolve) => setTimeout(resolve, 100)); + const result = await pool.query( + "SELECT COUNT(*) as count FROM que_jobs" + ); + processedCount = jobCount - parseInt(result.rows[0].count); + } + + await worker.shutdown(); + workers = []; + + // Calculate latency statistics + latencies.sort((a, b) => a - b); + const avgLatency = + latencies.reduce((sum, lat) => sum + lat, 0) / latencies.length; + const p50 = latencies[Math.floor(latencies.length * 0.5)]; + const p95 = latencies[Math.floor(latencies.length * 0.95)]; + const p99 = latencies[Math.floor(latencies.length * 0.99)]; + + console.log("Latency statistics:"); + console.log(` Average: ${avgLatency.toFixed(2)}ms`); + console.log(` P50: ${p50}ms`); + console.log(` P95: ${p95}ms`); + console.log(` P99: ${p99}ms`); + + expect(latencies.length).toBe(jobCount); + }, 30000); + }); +}); diff --git a/tests/setup.ts b/tests/setup.ts index fc883a5..aad7870 100644 --- a/tests/setup.ts +++ b/tests/setup.ts @@ -10,7 +10,6 @@ export const TEST_DB_CONFIG = { }; export async function setupTestDatabase(): Promise { - console.log("TEST_DB_CONFIG", TEST_DB_CONFIG); const pool = new Pool(TEST_DB_CONFIG); // Test connection From 831f38a2c83bb4aac3134b2df00c43e51140068b Mon Sep 17 00:00:00 2001 From: Quajo Duke Date: Tue, 16 Sep 2025 15:30:58 +0000 Subject: [PATCH 5/5] chore: fix eslint failure --- .eslintrc.js | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index efd34d6..24a99ca 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -1,21 +1,18 @@ module.exports = { - parser: '@typescript-eslint/parser', + parser: "@typescript-eslint/parser", parserOptions: { ecmaVersion: 2020, - sourceType: 'module', + sourceType: "module", }, - extends: [ - 'eslint:recommended', - '@typescript-eslint/recommended', - ], + extends: ["eslint:recommended", "plugin:@typescript-eslint/recommended"], rules: { - '@typescript-eslint/no-unused-vars': 'error', - '@typescript-eslint/explicit-function-return-type': 'warn', - '@typescript-eslint/no-explicit-any': 'warn', - '@typescript-eslint/no-non-null-assertion': 'warn', + "@typescript-eslint/no-unused-vars": "error", + "@typescript-eslint/explicit-function-return-type": "warn", + "@typescript-eslint/no-explicit-any": "warn", + "@typescript-eslint/no-non-null-assertion": "warn", }, env: { node: true, jest: true, }, -}; \ No newline at end of file +};