Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
"phpstan": "tools/phpstan analyse",
"stan-baseline": "tools/phpstan --generate-baseline",
"stan-setup": "phive install",
"rector-setup": "cp composer.json composer.backup && composer require --dev rector/rector:\"^2.2\" && mv composer.backup composer.json",
"rector-check": "vendor/bin/rector process --dry-run",
"rector-fix": "vendor/bin/rector process",
"test": "phpunit",
"test-coverage": "phpunit --coverage-clover=clover.xml"
}
Expand Down
26 changes: 26 additions & 0 deletions rector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php
declare(strict_types=1);

use Rector\CodeQuality\Rector\If_\SimplifyIfElseToTernaryRector;
use Rector\CodingStyle\Rector\ClassMethod\MakeInheritedMethodVisibilitySameAsParentRector;
use Rector\Config\RectorConfig;
use Rector\Strict\Rector\Empty_\DisallowedEmptyRuleFixerRector;
use Rector\ValueObject\PhpVersion;

return RectorConfig::configure()
->withPhpVersion(PhpVersion::PHP_82)
->withPaths([
__DIR__ . '/src',
__DIR__ . '/tests',
])
->withSkip([
DisallowedEmptyRuleFixerRector::class,
SimplifyIfElseToTernaryRector::class,
MakeInheritedMethodVisibilitySameAsParentRector::class,
])
->withParallel()
->withPreparedSets(
deadCode: true,
codeQuality: true,
codingStyle: true,
);
1 change: 0 additions & 1 deletion src/Command/JobCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public function templateData(Arguments $arguments): array
* Gets the option parser instance and configures it.
*
* @param \Cake\Console\ConsoleOptionParser $parser The parser to update.
* @return \Cake\Console\ConsoleOptionParser
*/
public function buildOptionParser(ConsoleOptionParser $parser): ConsoleOptionParser
{
Expand Down
19 changes: 8 additions & 11 deletions src/Command/PurgeFailedCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ class PurgeFailedCommand extends Command

/**
* Get the command name.
*
* @return string
*/
public static function defaultName(): string
{
Expand All @@ -38,8 +36,6 @@ public static function defaultName(): string

/**
* Gets the option parser instance and configures it.
*
* @return \Cake\Console\ConsoleOptionParser
*/
public function getOptionParser(): ConsoleOptionParser
{
Expand Down Expand Up @@ -72,9 +68,8 @@ public function getOptionParser(): ConsoleOptionParser
/**
* @param \Cake\Console\Arguments $args Arguments
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @return void
*/
public function execute(Arguments $args, ConsoleIo $io): void
public function execute(Arguments $args, ConsoleIo $io): int
{
/** @var \Cake\Queue\Model\Table\FailedJobsTable $failedJobsTable */
$failedJobsTable = $this->getTableLocator()->get('Cake/Queue.FailedJobs');
Expand Down Expand Up @@ -108,21 +103,23 @@ public function execute(Arguments $args, ConsoleIo $io): void
if (!$deletingCount) {
$io->out('0 jobs found.');

return;
return self::CODE_SUCCESS;
}

if (!$args->getOption('force')) {
$confirmed = $io->askChoice("Delete {$deletingCount} jobs?", ['y', 'n'], 'n');
$confirmed = $io->askChoice(sprintf('Delete %s jobs?', $deletingCount), ['y', 'n'], 'n');

if ($confirmed !== 'y') {
return;
return self::CODE_SUCCESS;
}
}

$io->out("Deleting {$deletingCount} jobs.");
$io->out(sprintf('Deleting %s jobs.', $deletingCount));

$failedJobsTable->deleteManyOrFail($jobsToDelete);

$io->success("{$deletingCount} jobs deleted.");
$io->success($deletingCount . ' jobs deleted.');

return self::CODE_SUCCESS;
}
}
29 changes: 13 additions & 16 deletions src/Command/RequeueCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ class RequeueCommand extends Command

/**
* Get the command name.
*
* @return string
*/
public static function defaultName(): string
{
Expand All @@ -40,8 +38,6 @@ public static function defaultName(): string

/**
* Gets the option parser instance and configures it.
*
* @return \Cake\Console\ConsoleOptionParser
*/
public function getOptionParser(): ConsoleOptionParser
{
Expand Down Expand Up @@ -74,9 +70,8 @@ public function getOptionParser(): ConsoleOptionParser
/**
* @param \Cake\Console\Arguments $args Arguments
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @return void
*/
public function execute(Arguments $args, ConsoleIo $io): void
public function execute(Arguments $args, ConsoleIo $io): int
{
/** @var \Cake\Queue\Model\Table\FailedJobsTable $failedJobsTable */
$failedJobsTable = $this->getTableLocator()->get('Cake/Queue.FailedJobs');
Expand Down Expand Up @@ -110,26 +105,26 @@ public function execute(Arguments $args, ConsoleIo $io): void
if (!$requeueingCount) {
$io->out('0 jobs found.');

return;
return self::CODE_SUCCESS;
}

if (!$args->getOption('force')) {
$confirmed = $io->askChoice("Requeue {$requeueingCount} jobs?", ['y', 'n'], 'n');
$confirmed = $io->askChoice(sprintf('Requeue %s jobs?', $requeueingCount), ['y', 'n'], 'n');

if ($confirmed !== 'y') {
return;
return self::CODE_SUCCESS;
}
}

$io->out("Requeueing {$requeueingCount} jobs.");
$io->out(sprintf('Requeueing %s jobs.', $requeueingCount));

$succeededCount = 0;
$failedCount = 0;

/** @var array<\Cake\Queue\Model\Entity\FailedJob> $jobsToRequeue */
$jobsToRequeue = $jobsToRequeueQuery->all();
foreach ($jobsToRequeue as $failedJob) {
$io->verbose("Requeueing FailedJob with ID {$failedJob->id}.");
$io->verbose(sprintf('Requeueing FailedJob with ID %d.', $failedJob->id));
try {
QueueManager::push(
[$failedJob->class, $failedJob->method],
Expand All @@ -145,19 +140,21 @@ public function execute(Arguments $args, ConsoleIo $io): void

$succeededCount++;
} catch (Exception $e) {
$io->err("Exception occurred while requeueing FailedJob with ID {$failedJob->id}");
$io->err('Exception occurred while requeueing FailedJob with ID ' . $failedJob->id);
$io->err((string)$e);

$failedCount++;
}
}

if ($failedCount) {
$io->err("Failed to requeue {$failedCount} jobs.");
if ($failedCount !== 0) {
$io->err(sprintf('Failed to requeue %d jobs.', $failedCount));
}

if ($succeededCount) {
$io->success("{$succeededCount} jobs requeued.");
if ($succeededCount !== 0) {
$io->success($succeededCount . ' jobs requeued.');
}

return self::CODE_SUCCESS;
}
}
30 changes: 9 additions & 21 deletions src/Command/WorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,16 @@
*/
class WorkerCommand extends Command
{
/**
* @var \Cake\Core\ContainerInterface|null
*/
protected ?ContainerInterface $container = null;

/**
* @param \Cake\Core\ContainerInterface|null $container DI container instance
*/
public function __construct(?ContainerInterface $container = null)
{
$this->container = $container;
public function __construct(
protected readonly ?ContainerInterface $container = null,
) {
}

/**
* Get the command name.
*
* @return string
*/
public static function defaultName(): string
{
Expand All @@ -68,8 +61,6 @@ public static function defaultName(): string

/**
* Gets the option parser instance and configures it.
*
* @return \Cake\Console\ConsoleOptionParser
*/
public function getOptionParser(): ConsoleOptionParser
{
Expand Down Expand Up @@ -122,7 +113,6 @@ public function getOptionParser(): ConsoleOptionParser
*
* @param \Cake\Console\Arguments $args Arguments
* @param \Psr\Log\LoggerInterface $logger Logger instance.
* @return \Enqueue\Consumption\ExtensionInterface
*/
protected function getQueueExtension(Arguments $args, LoggerInterface $logger): ExtensionInterface
{
Expand All @@ -138,12 +128,12 @@ protected function getQueueExtension(Arguments $args, LoggerInterface $logger):
$limitAttempsExtension,
];

if (!is_null($args->getOption('max-jobs'))) {
if ($args->getOption('max-jobs') !== null) {
$maxJobs = (int)$args->getOption('max-jobs');
$extensions[] = new LimitConsumedMessagesExtension($maxJobs);
}

if (!is_null($args->getOption('max-runtime'))) {
if ($args->getOption('max-runtime') !== null) {
$endTime = new DateTime(sprintf('+%d seconds', (int)$args->getOption('max-runtime')));
$extensions[] = new LimitConsumptionTimeExtension($endTime);
}
Expand All @@ -159,7 +149,6 @@ protected function getQueueExtension(Arguments $args, LoggerInterface $logger):
* Creates and returns a LoggerInterface object
*
* @param \Cake\Console\Arguments $args Arguments
* @return \Psr\Log\LoggerInterface
*/
protected function getLogger(Arguments $args): LoggerInterface
{
Expand All @@ -177,7 +166,6 @@ protected function getLogger(Arguments $args): LoggerInterface
* @param \Cake\Console\Arguments $args Arguments
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @param \Psr\Log\LoggerInterface $logger Logger instance
* @return \Interop\Queue\Processor
*/
protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface $logger): InteropProcessor
{
Expand All @@ -187,12 +175,12 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface
$processorClass = $config['processor'] ?? Processor::class;

if (!class_exists($processorClass)) {
$io->error(sprintf(sprintf('Processor class %s not found', $processorClass)));
$io->error(sprintf('Processor class %s not found', $processorClass));
$this->abort();
}

if (!is_subclass_of($processorClass, InteropProcessor::class)) {
$io->error(sprintf(sprintf('Processor class %s must implement Interop\Queue\Processor', $processorClass)));
$io->error(sprintf('Processor class %s must implement Interop\Queue\Processor', $processorClass));
$this->abort();
}

Expand All @@ -202,7 +190,6 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface
/**
* @param \Cake\Console\Arguments $args Arguments
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @return int
*/
public function execute(Arguments $args, ConsoleIo $io): int
{
Expand Down Expand Up @@ -231,10 +218,11 @@ public function execute(Arguments $args, ConsoleIo $io): int
$processor->getEventManager()->on($listener);
}
}

$client = QueueManager::engine($config);
$queue = $args->getOption('queue')
? (string)$args->getOption('queue')
: Configure::read("Queue.{$config}.queue", 'default');
: Configure::read(sprintf('Queue.%s.queue', $config), 'default');
$processorName = $args->getOption('processor') ? (string)$args->getOption('processor') : 'default';

$client->bindTopic($queue, $processor, $processorName);
Expand Down
18 changes: 4 additions & 14 deletions src/Consumption/LimitAttemptsExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,15 @@ class LimitAttemptsExtension implements MessageResultExtensionInterface
public const ATTEMPTS_PROPERTY = 'attempts';

/**
* The maximum number of times a job may be attempted. $maxAttempts defined on a
* Job will override this value.
*
* @var int|null
* @param int|null $maxAttempts The maximum number of times a job may be attempted. $maxAttempts defined on a Job will override this value.
*/
protected ?int $maxAttempts = null;

/**
* @param int|null $maxAttempts The maximum number of times a job may be attempted.
* @return void
*/
public function __construct(?int $maxAttempts = null)
{
$this->maxAttempts = $maxAttempts;
public function __construct(
protected readonly ?int $maxAttempts = null,
) {
}

/**
* @param \Enqueue\Consumption\Context\MessageResult $context The result of the message after it was processed.
* @return void
*/
public function onResult(MessageResult $context): void
{
Expand Down
17 changes: 3 additions & 14 deletions src/Consumption/LimitConsumedMessagesExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,21 @@
*/
class LimitConsumedMessagesExtension implements PreConsumeExtensionInterface, PostConsumeExtensionInterface
{
/**
* @var int
*/
protected int $messageLimit;

/**
* @var int
*/
protected int $messageConsumed = 0;

/**
* @param int $messageLimit The number of messages to process before exiting.
*/
public function __construct(int $messageLimit)
{
$this->messageLimit = $messageLimit;
public function __construct(
protected readonly int $messageLimit,
) {
}

/**
* Executed at every new cycle before calling SubscriptionConsumer::consume method.
* The consumption could be interrupted at this step.
*
* @param \Enqueue\Consumption\Context\PreConsume $context The PreConsume context.
* @return void
*/
public function onPreConsume(PreConsume $context): void
{
Expand All @@ -56,7 +47,6 @@ public function onPreConsume(PreConsume $context): void
* The consumption could be interrupted at this point.
*
* @param \Enqueue\Consumption\Context\PostConsume $context The PostConsume context.
* @return void
*/
public function onPostConsume(PostConsume $context): void
{
Expand All @@ -71,7 +61,6 @@ public function onPostConsume(PostConsume $context): void
* Check if the consumer should be stopped.
*
* @param \Psr\Log\LoggerInterface $logger The logger where messages will be logged.
* @return bool
*/
protected function shouldBeStopped(LoggerInterface $logger): bool
{
Expand Down
Loading