Skip to content

Commit 76b397b

Browse files
authored
Merge pull request #168 from skie/feature/custom-processors
Add configurable processor class support
2 parents c848547 + 33e6e05 commit 76b397b

File tree

4 files changed

+445
-7
lines changed

4 files changed

+445
-7
lines changed

docs/en/index.rst

Lines changed: 156 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ The following configuration should be present in the config array of your **conf
5454
// The name of an event listener class to associate with the worker
5555
'listener' => \App\Listener\WorkerListener::class,
5656

57+
// (optional) The processor class to use for processing messages.
58+
// Must implement Interop\Queue\Processor. Defaults to Cake\Queue\Queue\Processor
59+
'processor' => \App\Queue\CustomProcessor::class,
60+
5761
// The amount of time in milliseconds to sleep if no jobs are currently available. default: 10000
5862
'receiveTimeout' => 10000,
5963

@@ -119,14 +123,14 @@ A simple job that logs received messages would look like::
119123

120124
/**
121125
* The maximum number of times the job may be attempted. (optional property)
122-
*
126+
*
123127
* @var int|null
124128
*/
125129
public static $maxAttempts = 3;
126130

127131
/**
128132
* Whether there should be only one instance of a job on the queue at a time. (optional property)
129-
*
133+
*
130134
* @var bool
131135
*/
132136
public static $shouldBeUnique = false;
@@ -299,7 +303,7 @@ queue jobs, you can use the ``QueueTransport``. In your application's
299303

300304
return [
301305
// ... other configuration
302-
'EmailTransport' => [
306+
'EmailTransport' => [
303307
'default' => [
304308
'className' => MailTransport::class,
305309
// Configuration for MailTransport.
@@ -323,6 +327,153 @@ With this configuration in place, any time you send an email with the ``default`
323327
email profile CakePHP will generate a queue message. Once that queue message is
324328
processed the default ``MailTransport`` will be used to deliver the email messages.
325329

330+
Custom Processors
331+
================
332+
333+
You can customize how messages are processed by specifying a custom processor class
334+
in your queue configuration. Custom processors must implement the ``Interop\Queue\Processor``
335+
interface.
336+
337+
Example custom processor that extends the main Processor::
338+
339+
<?php
340+
declare(strict_types=1);
341+
342+
namespace App\Queue;
343+
344+
use Cake\Core\ContainerInterface;
345+
use Cake\Queue\Job\Message;
346+
use Cake\Queue\Queue\Processor;
347+
use Enqueue\Consumption\Result;
348+
use Error;
349+
use Interop\Queue\Context;
350+
use Interop\Queue\Message as QueueMessage;
351+
use Interop\Queue\Processor as InteropProcessor;
352+
use Psr\Log\LoggerInterface;
353+
use RuntimeException;
354+
use Throwable;
355+
356+
/**
357+
* Timed Processor
358+
*
359+
* Extends the original Processor to add timing metrics to all events.
360+
*/
361+
class TimedProcessor extends Processor
362+
{
363+
/**
364+
* Constructor
365+
*
366+
* @param \Psr\Log\LoggerInterface|null $logger Logger instance
367+
* @param \Cake\Core\ContainerInterface|null $container DI container instance
368+
*/
369+
public function __construct(?LoggerInterface $logger = null, ?ContainerInterface $container = null)
370+
{
371+
parent::__construct($logger, $container);
372+
}
373+
374+
/**
375+
* Process message with timing
376+
*
377+
* @param \Interop\Queue\Message $message Message
378+
* @param \Interop\Queue\Context $context Context
379+
* @return object|string
380+
*/
381+
public function process(QueueMessage $message, Context $context): string|object
382+
{
383+
$this->dispatchEvent('Processor.message.seen', ['queueMessage' => $message]);
384+
385+
$jobMessage = new Message($message, $context, $this->container);
386+
try {
387+
$jobMessage->getCallable();
388+
} catch (RuntimeException | Error $e) {
389+
$this->logger->debug('Invalid callable for message. Rejecting message from queue.');
390+
$this->dispatchEvent('Processor.message.invalid', ['message' => $jobMessage]);
391+
392+
return InteropProcessor::REJECT;
393+
}
394+
395+
$startTime = microtime(true) * 1000;
396+
$this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]);
397+
398+
try {
399+
$response = $this->processMessage($jobMessage);
400+
} catch (Throwable $e) {
401+
$message->setProperty('jobException', $e);
402+
403+
$this->logger->debug(sprintf('Message encountered exception: %s', $e->getMessage()));
404+
$this->dispatchEvent('Processor.message.exception', [
405+
'message' => $jobMessage,
406+
'exception' => $e,
407+
'duration' => (int)((microtime(true) * 1000) - $startTime),
408+
]);
409+
410+
return Result::requeue('Exception occurred while processing message');
411+
}
412+
413+
$duration = (int)((microtime(true) * 1000) - $startTime);
414+
415+
if ($response === InteropProcessor::ACK) {
416+
$this->logger->debug('Message processed successfully');
417+
$this->dispatchEvent('Processor.message.success', [
418+
'message' => $jobMessage,
419+
'duration' => $duration,
420+
]);
421+
422+
return InteropProcessor::ACK;
423+
}
424+
425+
if ($response === InteropProcessor::REJECT) {
426+
$this->logger->debug('Message processed with rejection');
427+
$this->dispatchEvent('Processor.message.reject', [
428+
'message' => $jobMessage,
429+
'duration' => $duration,
430+
]);
431+
432+
return InteropProcessor::REJECT;
433+
}
434+
435+
$this->logger->debug('Message processed with failure, requeuing');
436+
$this->dispatchEvent('Processor.message.failure', [
437+
'message' => $jobMessage,
438+
'duration' => $duration,
439+
]);
440+
441+
return InteropProcessor::REQUEUE;
442+
}
443+
}
444+
445+
Configuration example::
446+
447+
'Queue' => [
448+
'default' => [
449+
'url' => 'redis://localhost:6379',
450+
'queue' => 'default',
451+
// No processor specified - uses default Processor class
452+
],
453+
'timed' => [
454+
'url' => 'redis://localhost:6379',
455+
'queue' => 'timed',
456+
'processor' => \App\Queue\TimedProcessor::class, // Custom processor with timing
457+
],
458+
],
459+
460+
**Note**: If no processor is specified in the configuration, the default
461+
``Cake\Queue\Queue\Processor`` class will be used. Custom processors are useful
462+
for adding custom logging, metrics collection, or specialized message handling.
463+
464+
**Important**: The `--processor` command line option is different from the `processor` configuration option:
465+
466+
- **Configuration `processor`**: Specifies the processor class to use for processing messages
467+
- **Command line `--processor`**: Specifies the processor name for Enqueue topic binding (used in `bindTopic()`)
468+
469+
Example usage::
470+
471+
# Use custom processor class from config
472+
bin/cake queue worker --config=timed
473+
474+
# Use custom processor class AND specify topic binding name
475+
bin/cake queue worker --config=timed --processor=my-topic-processor
476+
326477
Run the worker
327478
==============
328479

@@ -336,7 +487,7 @@ This shell can take a few different options:
336487

337488
- ``--config`` (default: default): Name of a queue config to use
338489
- ``--queue`` (default: default): Name of queue to bind to
339-
- ``--processor`` (default: ``null``): Name of processor to bind to
490+
- ``--processor`` (default: ``null``): Name of processor to bind to (for Enqueue topic binding, not the processor class)
340491
- ``--logger`` (default: ``stdout``): Name of a configured logger
341492
- ``--max-jobs`` (default: ``null``): Maximum number of jobs to process. Worker will exit after limit is reached.
342493
- ``--max-runtime`` (default: ``null``): Maximum number of seconds to run. Worker will exit after limit is reached.
@@ -370,7 +521,7 @@ Requeue Failed Jobs
370521

371522
Push jobs back onto the queue and remove them from the ``queue_failed_jobs``
372523
table. If a job fails to requeue it is not guaranteed that the job was not run.
373-
524+
374525
.. code-block:: bash
375526
376527
bin/cake queue requeue

src/Command/WorkerCommand.php

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
3535
use Enqueue\Consumption\Extension\LoggerExtension;
3636
use Enqueue\Consumption\ExtensionInterface;
37+
use Interop\Queue\Processor as InteropProcessor;
3738
use Psr\Log\LoggerInterface;
3839
use Psr\Log\NullLogger;
3940

@@ -170,6 +171,34 @@ protected function getLogger(Arguments $args): LoggerInterface
170171
return $logger ?? new NullLogger();
171172
}
172173

174+
/**
175+
* Creates and returns a Processor object
176+
*
177+
* @param \Cake\Console\Arguments $args Arguments
178+
* @param \Cake\Console\ConsoleIo $io ConsoleIo
179+
* @param \Psr\Log\LoggerInterface $logger Logger instance
180+
* @return \Interop\Queue\Processor
181+
*/
182+
protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface $logger): InteropProcessor
183+
{
184+
$configKey = (string)$args->getOption('config');
185+
$config = QueueManager::getConfig($configKey);
186+
187+
$processorClass = $config['processor'] ?? Processor::class;
188+
189+
if (!class_exists($processorClass)) {
190+
$io->error(sprintf(sprintf('Processor class %s not found', $processorClass)));
191+
$this->abort();
192+
}
193+
194+
if (!is_subclass_of($processorClass, InteropProcessor::class)) {
195+
$io->error(sprintf(sprintf('Processor class %s must implement Interop\Queue\Processor', $processorClass)));
196+
$this->abort();
197+
}
198+
199+
return new $processorClass($logger, $this->container);
200+
}
201+
173202
/**
174203
* @param \Cake\Console\Arguments $args Arguments
175204
* @param \Cake\Console\ConsoleIo $io ConsoleIo
@@ -184,7 +213,7 @@ public function execute(Arguments $args, ConsoleIo $io): int
184213
}
185214

186215
$logger = $this->getLogger($args);
187-
$processor = new Processor($logger, $this->container);
216+
$processor = $this->getProcessor($args, $io, $logger);
188217
$extension = $this->getQueueExtension($args, $logger);
189218

190219
$hasListener = Configure::check(sprintf('Queue.%s.listener', $config));
@@ -197,7 +226,10 @@ public function execute(Arguments $args, ConsoleIo $io): int
197226

198227
/** @var \Cake\Event\EventListenerInterface $listener */
199228
$listener = new $listenerClassName();
200-
$processor->getEventManager()->on($listener);
229+
230+
if ($processor instanceof Processor) {
231+
$processor->getEventManager()->on($listener);
232+
}
201233
}
202234
$client = QueueManager::engine($config);
203235
$queue = $args->getOption('queue')

0 commit comments

Comments
 (0)