Skip to content

Commit 3937a94

Browse files
Merge branch 'gkdis6-docs/update-kafka-max-retry-filter-example'
2 parents 75161b3 + 1844363 commit 3937a94

File tree

1 file changed

+102
-0
lines changed

1 file changed

+102
-0
lines changed

content/microservices/kafka.md

+102
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,108 @@ throw new KafkaRetriableException('...');
433433

434434
> info **Hint** `KafkaRetriableException` class is exported from the `@nestjs/microservices` package.
435435
436+
### Custom exception handling
437+
438+
Along with the default error handling mechanisms, you can create a custom Exception Filter for Kafka events to manage retry logic. For instance, the example below demonstrates how to skip a problematic event after a configurable number of retries:
439+
440+
```typescript
441+
import { Catch, ArgumentsHost, Logger } from '@nestjs/common';
442+
import { BaseExceptionFilter } from '@nestjs/core';
443+
import { KafkaContext } from '../ctx-host';
444+
445+
@Catch()
446+
export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
447+
private readonly logger = new Logger(KafkaMaxRetryExceptionFilter.name);
448+
449+
constructor(
450+
private readonly maxRetries: number,
451+
// Optional custom function executed when max retries are exceeded
452+
private readonly skipHandler?: (message: any) => Promise<void>,
453+
) {
454+
super();
455+
}
456+
457+
async catch(exception: unknown, host: ArgumentsHost) {
458+
const kafkaContext = host.switchToRpc().getContext<KafkaContext>();
459+
const message = kafkaContext.getMessage();
460+
const currentRetryCount = this.getRetryCountFromContext(kafkaContext);
461+
462+
if (currentRetryCount >= this.maxRetries) {
463+
this.logger.warn(
464+
`Max retries (${
465+
this.maxRetries
466+
}) exceeded for message: ${JSON.stringify(message)}`,
467+
);
468+
469+
if (this.skipHandler) {
470+
try {
471+
await this.skipHandler(message);
472+
} catch (err) {
473+
this.logger.error('Error in skipHandler:', err);
474+
}
475+
}
476+
477+
try {
478+
await this.commitOffset(kafkaContext);
479+
} catch (commitError) {
480+
this.logger.error('Failed to commit offset:', commitError);
481+
}
482+
return; // Stop propagating the exception
483+
}
484+
485+
// If retry count is below the maximum, proceed with the default Exception Filter logic
486+
super.catch(exception, host);
487+
}
488+
489+
private getRetryCountFromContext(context: KafkaContext): number {
490+
const headers = context.getMessage().headers || {};
491+
const retryHeader = headers['retryCount'] || headers['retry-count'];
492+
return retryHeader ? Number(retryHeader) : 0;
493+
}
494+
495+
private async commitOffset(context: KafkaContext): Promise<void> {
496+
const consumer = context.getConsumer && context.getConsumer();
497+
if (!consumer) {
498+
throw new Error('Consumer instance is not available from KafkaContext.');
499+
}
500+
501+
const topic = context.getTopic && context.getTopic();
502+
const partition = context.getPartition && context.getPartition();
503+
const message = context.getMessage();
504+
const offset = message.offset;
505+
506+
if (!topic || partition === undefined || offset === undefined) {
507+
throw new Error(
508+
'Incomplete Kafka message context for committing offset.',
509+
);
510+
}
511+
512+
await consumer.commitOffsets([
513+
{
514+
topic,
515+
partition,
516+
// When committing an offset, commit the next number (i.e., current offset + 1)
517+
offset: (Number(offset) + 1).toString(),
518+
},
519+
]);
520+
}
521+
}
522+
```
523+
524+
This filter offers a way to retry processing a Kafka event up to a configurable number of times. Once the maximum retries are reached, it triggers a custom `skipHandler` (if provided) and commits the offset, effectively skipping the problematic event. This allows subsequent events to be processed without interruption.
525+
526+
You can integrate this filter by adding it to your event handlers:
527+
528+
```typescript
529+
@UseFilters(new KafkaMaxRetryExceptionFilter(5))
530+
export class MyEventHandler {
531+
@EventPattern('your-topic')
532+
async handleEvent(@Payload() data: any, @Ctx() context: KafkaContext) {
533+
// Your event processing logic...
534+
}
535+
}
536+
```
537+
436538
#### Commit offsets
437539

438540
Committing offsets is essential when working with Kafka. Per default, messages will be automatically committed after a specific time. For more information visit [KafkaJS docs](https://kafka.js.org/docs/consuming#autocommit). `KafkaContext` offers a way to access the active consumer for manually committing offsets. The consumer is the KafkaJS consumer and works as the [native KafkaJS implementation](https://kafka.js.org/docs/consuming#manual-committing).

0 commit comments

Comments
 (0)