diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 11d4b27..2cec1fd 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -15,15 +15,15 @@ jobs: strategy: matrix: - node-version: [16.x] + node-version: [18.x] # See supported Node.js release schedule at https://nodejs.org/en/about/releases/ steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Use Node.js ${{ matrix.node-version }} - uses: actions/setup-node@v2 + uses: actions/setup-node@v3 with: node-version: ${{ matrix.node-version }} cache: 'npm' - run: npm ci - - run: npm run lint \ No newline at end of file + - run: npm run lint diff --git a/apps/queue-worker/src/app.module.ts b/apps/queue-worker/src/app.module.ts index cef5394..0250454 100644 --- a/apps/queue-worker/src/app.module.ts +++ b/apps/queue-worker/src/app.module.ts @@ -3,7 +3,9 @@ import { ApiMetricsController, CommonConfigModule, HealthCheckController } from import { ApiMetricsModule } from '@libs/common'; import { LoggingModule } from '@multiversx/sdk-nestjs-common'; import { AppConfigModule } from './config/app-config.module'; -import { ExampleQueueService } from './worker/queues/example.queue.service'; +import { ScheduleModule } from '@nestjs/schedule'; +import { ExampleQueuePrintService } from './worker/queues/example.queue.print.service'; +import { ExampleQueueAddService } from './worker/queues/example.queue.add.service'; import { BullModule } from '@nestjs/bull'; import { BullQueueModule } from './worker/bull.queue.module'; import { WorkerService } from './worker/worker.service'; @@ -14,14 +16,17 @@ import { WorkerService } from './worker/worker.service'; ApiMetricsModule, AppConfigModule, CommonConfigModule, + ScheduleModule.forRoot(), BullQueueModule, - BullModule.registerQueue({ - name: 'exampleQueue', - }), + BullModule.registerQueue( + { name: 'exampleQueuePrint' }, + { name: 'exampleQueueAdd' }, + ), ], providers: [ WorkerService, - ExampleQueueService, + ExampleQueuePrintService, + ExampleQueueAddService, ], controllers: [ ApiMetricsController, diff --git a/apps/queue-worker/src/worker/queues/example.queue.add.service.ts b/apps/queue-worker/src/worker/queues/example.queue.add.service.ts new file mode 100644 index 0000000..e2ac4e2 --- /dev/null +++ b/apps/queue-worker/src/worker/queues/example.queue.add.service.ts @@ -0,0 +1,29 @@ +import { Process, Processor } from "@nestjs/bull"; +import { Logger } from "@nestjs/common"; +import { Job } from "bull"; + +@Processor('exampleQueueAdd') +export class ExampleQueueAddService { + private readonly logger: Logger; + + constructor() { + this.logger = new Logger(ExampleQueueAddService.name); + } + + @Process({ name: 'add', concurrency: 2 }) + async handleAdd(job: Job) { + const { a, b } = job.data; + const sum = a + b; + await new Promise(r => setTimeout(r, 1500)); + this.logger.log({ + type: 'consumer', + jobName: job.name, + jobId: job.id, + a, + b, + sum, + attemptsMade: job.attemptsMade, + }); + return sum; + } +} diff --git a/apps/queue-worker/src/worker/queues/example.queue.print.service.ts b/apps/queue-worker/src/worker/queues/example.queue.print.service.ts new file mode 100644 index 0000000..73cd7e6 --- /dev/null +++ b/apps/queue-worker/src/worker/queues/example.queue.print.service.ts @@ -0,0 +1,24 @@ +import { Process, Processor } from "@nestjs/bull"; +import { Logger } from "@nestjs/common"; +import { Job } from "bull"; + +@Processor('exampleQueuePrint') +export class ExampleQueuePrintService { + private readonly logger: Logger; + + constructor() { + this.logger = new Logger(ExampleQueuePrintService.name); + } + + @Process({ name: 'print', concurrency: 1 }) + async handlePrint(job: Job<{ message: string }>) { + await new Promise(r => setTimeout(r, 1000)); + this.logger.log({ + type: 'consumer', + jobName: job.name, + jobId: job.id, + message: job.data.message, + attemptsMade: job.attemptsMade, + }); + } +} diff --git a/apps/queue-worker/src/worker/queues/example.queue.service.ts b/apps/queue-worker/src/worker/queues/example.queue.service.ts deleted file mode 100644 index 9c4ab1d..0000000 --- a/apps/queue-worker/src/worker/queues/example.queue.service.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { Process, Processor } from "@nestjs/bull"; -import { Injectable, Logger } from "@nestjs/common"; -import { Job } from "bull"; - -@Injectable() -@Processor('exampleQueue') -export class ExampleQueueService { - private readonly logger: Logger; - - constructor() { - this.logger = new Logger(ExampleQueueService.name); - } - - @Process({ concurrency: 4 }) - onNftCreated(job: Job<{ identifier: string }>) { - this.logger.log({ type: 'consumer', jobId: job.id, identifier: job.data.identifier, attemptsMade: job.attemptsMade }); - } -} diff --git a/apps/queue-worker/src/worker/worker.service.ts b/apps/queue-worker/src/worker/worker.service.ts index b1fad69..78b45cd 100644 --- a/apps/queue-worker/src/worker/worker.service.ts +++ b/apps/queue-worker/src/worker/worker.service.ts @@ -8,24 +8,44 @@ export class WorkerService { private readonly logger: Logger; constructor( - @InjectQueue('exampleQueue') private exampleQueue: Queue + @InjectQueue('exampleQueuePrint') private exampleQueuePrint: Queue, + @InjectQueue('exampleQueueAdd') private exampleQueueAdd: Queue, ) { this.logger = new Logger(WorkerService.name); } - @Cron(CronExpression.EVERY_MINUTE) - async startJob() { - const identifiers = ['job1', 'job2', 'job3', 'job4', 'job5']; - for (const identifier of identifiers) { - const job = await this.exampleQueue.add({ identifier }, { + @Cron(CronExpression.EVERY_5_SECONDS) + async enqueuePrintJobs() { + // enqueue print-string jobs + const messages = ['hello', 'from', 'queue-worker']; + for (const message of messages) { + const job = await this.exampleQueuePrint.add('print', { message }, { priority: 1000, attempts: 3, timeout: 60000, - delay: 30000, + delay: 0, removeOnComplete: true, }); + this.logger.log({ type: 'producer:print', queue: 'exampleQueuePrint', jobId: job.id, message: job.data.message }); + } + } - this.logger.log({ type: 'producer', jobId: job.id, identifier: job.data.identifier }); + @Cron(CronExpression.EVERY_MINUTE) + async enqueueAddJobs() { + const pairs = [ + { a: 1, b: 2 }, + { a: 5, b: 7 }, + { a: 10, b: 15 }, + ]; + for (const payload of pairs) { + const job = await this.exampleQueueAdd.add('add', payload, { + priority: 1000, + attempts: 3, + timeout: 60000, + delay: 0, + removeOnComplete: true, + }); + this.logger.log({ type: 'producer:add', queue: 'exampleQueueAdd', jobId: job.id, a: payload.a, b: payload.b }); } } }