Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ jobs:

strategy:
matrix:
node-version: [16.x]
node-version: [18.x]
# See supported Node.js release schedule at https://nodejs.org/en/about/releases/

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v2
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
cache: 'npm'
- run: npm ci
- run: npm run lint
- run: npm run lint
15 changes: 10 additions & 5 deletions apps/queue-worker/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import { ApiMetricsController, CommonConfigModule, HealthCheckController } from
import { ApiMetricsModule } from '@libs/common';
import { LoggingModule } from '@multiversx/sdk-nestjs-common';
import { AppConfigModule } from './config/app-config.module';
import { ExampleQueueService } from './worker/queues/example.queue.service';
import { ScheduleModule } from '@nestjs/schedule';
import { ExampleQueuePrintService } from './worker/queues/example.queue.print.service';
import { ExampleQueueAddService } from './worker/queues/example.queue.add.service';
import { BullModule } from '@nestjs/bull';
import { BullQueueModule } from './worker/bull.queue.module';
import { WorkerService } from './worker/worker.service';
Expand All @@ -14,14 +16,17 @@ import { WorkerService } from './worker/worker.service';
ApiMetricsModule,
AppConfigModule,
CommonConfigModule,
ScheduleModule.forRoot(),
BullQueueModule,
BullModule.registerQueue({
name: 'exampleQueue',
}),
BullModule.registerQueue(
{ name: 'exampleQueuePrint' },
{ name: 'exampleQueueAdd' },
),
],
providers: [
WorkerService,
ExampleQueueService,
ExampleQueuePrintService,
ExampleQueueAddService,
],
controllers: [
ApiMetricsController,
Expand Down
30 changes: 30 additions & 0 deletions apps/queue-worker/src/worker/queues/example.queue.add.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Process, Processor } from "@nestjs/bull";
import { Injectable, Logger } from "@nestjs/common";
import { Job } from "bull";

@Injectable()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be removed. Same for the other processor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@Processor('exampleQueueAdd')
export class ExampleQueueAddService {
private readonly logger: Logger;

constructor() {
this.logger = new Logger(ExampleQueueAddService.name);
}

@Process({ name: 'add', concurrency: 2 })
async handleAdd(job: Job) {
const { a, b } = job.data;
const sum = a + b;
await new Promise(r => setTimeout(r, 1500));
this.logger.log({
type: 'consumer',
jobName: job.name,
jobId: job.id,
a,
b,
sum,
attemptsMade: job.attemptsMade,
});
return sum;
}
}
25 changes: 25 additions & 0 deletions apps/queue-worker/src/worker/queues/example.queue.print.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Process, Processor } from "@nestjs/bull";
import { Injectable, Logger } from "@nestjs/common";
import { Job } from "bull";

@Injectable()
@Processor('exampleQueuePrint')
export class ExampleQueuePrintService {
private readonly logger: Logger;

constructor() {
this.logger = new Logger(ExampleQueuePrintService.name);
}

@Process({ name: 'print', concurrency: 1 })
async handlePrint(job: Job<{ message: string }>) {
await new Promise(r => setTimeout(r, 1000));
this.logger.log({
type: 'consumer',
jobName: job.name,
jobId: job.id,
message: job.data.message,
attemptsMade: job.attemptsMade,
});
}
}
18 changes: 0 additions & 18 deletions apps/queue-worker/src/worker/queues/example.queue.service.ts

This file was deleted.

36 changes: 28 additions & 8 deletions apps/queue-worker/src/worker/worker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,44 @@ export class WorkerService {
private readonly logger: Logger;

constructor(
@InjectQueue('exampleQueue') private exampleQueue: Queue
@InjectQueue('exampleQueuePrint') private exampleQueuePrint: Queue,
@InjectQueue('exampleQueueAdd') private exampleQueueAdd: Queue,
) {
this.logger = new Logger(WorkerService.name);
}

@Cron(CronExpression.EVERY_MINUTE)
async startJob() {
const identifiers = ['job1', 'job2', 'job3', 'job4', 'job5'];
for (const identifier of identifiers) {
const job = await this.exampleQueue.add({ identifier }, {
@Cron(CronExpression.EVERY_5_SECONDS)
async enqueuePrintJobs() {
// enqueue print-string jobs
const messages = ['hello', 'from', 'queue-worker'];
for (const message of messages) {
const job = await this.exampleQueuePrint.add('print', { message }, {
priority: 1000,
attempts: 3,
timeout: 60000,
delay: 30000,
delay: 0,
removeOnComplete: true,
});
this.logger.log({ type: 'producer:print', queue: 'exampleQueuePrint', jobId: job.id, message: job.data.message });
}
}

this.logger.log({ type: 'producer', jobId: job.id, identifier: job.data.identifier });
@Cron(CronExpression.EVERY_MINUTE)
async enqueueAddJobs() {
const pairs = [
{ a: 1, b: 2 },
{ a: 5, b: 7 },
{ a: 10, b: 15 },
];
for (const payload of pairs) {
const job = await this.exampleQueueAdd.add('add', payload, {
priority: 1000,
attempts: 3,
timeout: 60000,
delay: 0,
removeOnComplete: true,
});
this.logger.log({ type: 'producer:add', queue: 'exampleQueueAdd', jobId: job.id, a: payload.a, b: payload.b });
}
}
}