diff --git a/.gitignore b/.gitignore index 00dea824..0fda48c9 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ composer.lock phpunit.xml .phpunit.result.cache .php_cs.cache +.phpunit.cache \ No newline at end of file diff --git a/README.md b/README.md index adb7b2e9..1da167b2 100644 --- a/README.md +++ b/README.md @@ -557,6 +557,50 @@ There are two ways of consuming messages. 2. `rabbitmq:consume` command which is provided by this package. This command utilizes `basic_consume` and is more performant than `basic_get` by ~2x, but does not support multiple queues. +### Getting performance and durability +To get the best performance and durability you must use option 2, and you need to extend the base config and use some extra options in the command. + +The command to execute: +```bash + rabbitmq:consume --blocking=1 --auto-reconnect=1 --alive-check=30 --init-queue=1 --verbose-messages=1 --prefetch-count=1 +``` +- `--blocking=1` is used to use blocking waiting mechanism [performance] (STRONGLY RECOMMENDED) +- `--auto-reconnect=1` is used to auto reconnect if something is wrong with the connection [durability] (recommended) +- `--alive-check=30` is used to custom check if connection is stuck [durability] (recommended with `blocking=1` if you have virtualization/proxies) +- `--init-queue=1` is used to auto create queue before consuming [durability] (recommended) +- `--verbose-messages=1` is used to not write anything about processed messages [performance] (not necessary) +- `--prefetch-count=1` is used to limit number of messages prefetched [durability] (not necessary) + + +The config to provide: +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + // ... + 'queue' => [ + 'use_expiration_for_delayed_queues' => true // To create only one later-queue for any TTL + 'declare_full_route' => true // To auto-init full route of message if you need both queue + exchange + 'retries' => [ // To enable retries if the queue fails to push + 'enabled' => true, + 'max' => 5, // number of retries + 'pause_micro_seconds' => 1e6 // pause between retries + ], + + 'channel_rpc_timeout' => 3 // To make custom alive-check work (required with `blocking=1` and --alive-check=N`) + 'keepalive' => true // To keep the connection alive (STRONGLY RECOMMENDED with `blocking=1`) + ] + ], + ], + + // ... +], +``` + ## Testing Setup RabbitMQ using `docker-compose`: diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php index 4072132a..087394cc 100644 --- a/src/Console/ConsumeCommand.php +++ b/src/Console/ConsumeCommand.php @@ -3,6 +3,10 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Console; use Illuminate\Queue\Console\WorkCommand; +use Illuminate\Queue\Events\JobFailed; +use Illuminate\Queue\Events\JobProcessed; +use Illuminate\Queue\Events\JobProcessing; +use Illuminate\Queue\Events\JobReleasedAfterException; use Illuminate\Support\Str; use VladimirYuldashev\LaravelQueueRabbitMQ\Consumer; @@ -30,10 +34,18 @@ class ConsumeCommand extends WorkCommand {--consumer-tag} {--prefetch-size=0} {--prefetch-count=1000} + {--blocking=0 : Weather to block queue waiting or not} + {--init-queue=0 : Enables init the queue before starting consuming} + {--auto-reconnect=0 : Enables auto-reconnection when something is wrong with the connection} + {--auto-reconnect-pause=0.5 : The pause (in seconds) before reconnecting} + {--alive-check=0 : The pause (in seconds) before reconnecting} + {--verbose-messages=0 : Write messages only when verbose mode is enabled} '; protected $description = 'Consume messages'; + protected $useVerboseForMessages = false; + public function handle(): void { /** @var Consumer $consumer */ @@ -45,6 +57,14 @@ public function handle(): void $consumer->setMaxPriority((int) $this->option('max-priority')); $consumer->setPrefetchSize((int) $this->option('prefetch-size')); $consumer->setPrefetchCount((int) $this->option('prefetch-count')); + $consumer->setBlocking($this->booleanOption('blocking')); + $consumer->setInitQueue($this->booleanOption('init-queue')); + $consumer->setAutoReconnect($this->booleanOption('auto-reconnect')); + $consumer->setAutoReconnectPause((float)$this->option('auto-reconnect-pause')); + $consumer->setAliveCheck((float)$this->option('alive-check')); + + $consumer->setInteractWithIO($this); + $this->useVerboseForMessages = $this->booleanOption('verbose-messages'); parent::handle(); } @@ -63,4 +83,45 @@ protected function consumerTag(): string return Str::substr($consumerTag, 0, 255); } + + protected function booleanOption(string $key): bool + { + return filter_var( + $this->option($key), + FILTER_VALIDATE_BOOLEAN + ); + } + + /** + * Output worker results only in verbose mode + */ + protected function listenForEvents() + { + if ($this->useVerboseForMessages) { + parent::listenForEvents(); + return; + } + + $this->laravel['events']->listen(JobFailed::class, function ($event) { + if ($this->output->isVerbose()) { + $this->writeOutput($event->job, 'failed'); + } + + $this->logFailedJob($event); + }); + + if ($this->output->isVerbose()) { + $this->laravel['events']->listen(JobProcessing::class, function ($event) { + $this->writeOutput($event->job, 'starting'); + }); + + $this->laravel['events']->listen(JobProcessed::class, function ($event) { + $this->writeOutput($event->job, 'success'); + }); + + $this->laravel['events']->listen(JobReleasedAfterException::class, function ($event) { + $this->writeOutput($event->job, 'released_after_exception'); + }); + } + } } diff --git a/src/Consumer.php b/src/Consumer.php index ed3d8099..efe404f3 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -3,12 +3,20 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ; use Exception; +use Illuminate\Console\Concerns\InteractsWithIO; use Illuminate\Container\Container; +use Illuminate\Contracts\Debug\ExceptionHandler; +use Illuminate\Contracts\Events\Dispatcher; +use Illuminate\Contracts\Queue\Factory as QueueManager; +use Illuminate\Queue\Events\JobTimedOut; use Illuminate\Queue\Worker; use Illuminate\Queue\WorkerOptions; use PhpAmqpLib\Channel\AMQPChannel; -use PhpAmqpLib\Exception\AMQPRuntimeException; +use PhpAmqpLib\Exception\AMQPExceptionInterface; +use PhpAmqpLib\Exception\AMQPProtocolChannelException; +use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Message\AMQPMessage; +use Symfony\Component\Console\Output\OutputInterface; use Throwable; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; @@ -29,12 +37,70 @@ class Consumer extends Worker /** @var int */ protected $prefetchCount; - /** @var AMQPChannel */ + /** + * @var bool + */ + protected $blocking = false; + + /** + * @var bool + */ + protected $initQueue = false; + + /** + * @var bool + */ + protected $autoReconnect = false; + + /** + * @var float + */ + protected $autoReconnectPause = 0; + + /** + * @var float + */ + protected $aliveCheck = 0; + + /** + * @var bool + */ + protected $asyncSignalsSupported; + + /** + * @var RabbitMQQueue + */ + protected $connection = null; + + /** + * @var AMQPChannel + */ protected $channel; - /** @var object|null */ + /** + * @var object|null + */ protected $currentJob; + /** + * @var InteractsWithIO + */ + protected $interactsWithIO = null; + + /** + * {@inheritDoc} + */ + public function __construct( + QueueManager $manager, + Dispatcher $events, + ExceptionHandler $exceptions, + callable $isDownForMaintenance, + ?callable $resetScope = null + ) { + parent::__construct($manager, $events, $exceptions, $isDownForMaintenance, $resetScope); + $this->asyncSignalsSupported = $this->supportsAsyncSignals(); + } + public function setContainer(Container $value): void { $this->container = $value; @@ -60,6 +126,39 @@ public function setPrefetchCount(int $value): void $this->prefetchCount = $value; } + public function setBlocking(bool $value): void + { + $this->blocking = $value; + } + + public function setInitQueue(bool $value): void + { + $this->initQueue = $value; + } + + public function setAutoReconnect(bool $value): void + { + $this->autoReconnect = $value; + } + + public function setAutoReconnectPause(float $value): void + { + $this->autoReconnectPause = $value; + } + + public function setAliveCheck(float $value): void + { + $this->aliveCheck = $value; + } + + /** + * @param InteractsWithIO $value + */ + public function setInteractWithIO($value): void + { + $this->interactsWithIO = $value; + } + /** * Listen to the given queue in a loop. * @@ -71,114 +170,237 @@ public function setPrefetchCount(int $value): void */ public function daemon($connectionName, $queue, WorkerOptions $options) { - if ($this->supportsAsyncSignals()) { + if ($this->asyncSignalsSupported) { $this->listenForSignals(); } + $startTime = hrtime(true) / 1e9; + $jobsProcessed = 0; $lastRestart = $this->getTimestampOfLastQueueRestart(); - [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0]; + while (true) { + try { + $this->printOutputLine('Creating new connection'); + $this->initConnection($connectionName, $queue); + $status = $this->consume( + $startTime, + $jobsProcessed, + $lastRestart, + $connectionName, + $queue, + $options + ); + if ($status !== null) { + return $status; + } - /** @var RabbitMQQueue $connection */ - $connection = $this->manager->connection($connectionName); + break; + } catch (AMQPExceptionInterface $exception) { + $this->exceptions->report($exception); - $this->channel = $connection->getChannel(); + if (! $this->isAmqpConnectionException($exception)) { + $this->kill(self::EXIT_ERROR, $options); - $this->channel->basic_qos( - $this->prefetchSize, - $this->prefetchCount, - false - ); + return 0; + } elseif (! $this->autoReconnect) { + $this->kill(self::EXIT_ERROR, $options); + + return 0; + } - $jobClass = $connection->getJobClass(); + $this->sleep($this->autoReconnectPause); + } + } + + return 0; + } + + /** + * @return int + * + * @throws Throwable + */ + protected function consume(float $startTime, int &$jobsProcessed, ?int $lastRestart, string $connectionName, string $queue, WorkerOptions $options) + { + $jobClass = $this->connection->getJobClass(); $arguments = []; if ($this->maxPriority) { $arguments['priority'] = ['I', $this->maxPriority]; } - $this->channel->basic_consume( - $queue, - $this->consumerTag, - false, - false, - false, - false, - function (AMQPMessage $message) use ($connection, $options, $connectionName, $queue, $jobClass, &$jobsProcessed): void { - $job = new $jobClass( - $this->container, - $connection, - $message, - $connectionName, - $queue - ); - - $this->currentJob = $job; + while (true) { + if ($this->blocking && ! $this->daemonShouldRun($options, $connectionName, $queue)) { + if ($this->asyncSignalsSupported) { + pcntl_signal_dispatch(); + } - if ($this->supportsAsyncSignals()) { - $this->registerTimeoutHandler($job, $options); + $status = $this->pauseWorker($options, $lastRestart); + if ($status !== null) { + return $this->stop($status, $options); } - $jobsProcessed++; + continue; + } - $this->runJob($job, $connectionName, $options); + $this->reinitChannelIfNeed($queue); + $this->channel->basic_consume( + $queue, + $this->consumerTag, + false, + false, + false, + false, + function (AMQPMessage $message) use ($options, $connectionName, $queue, $jobClass, &$jobsProcessed): void { + $this->printOutputLine('New message is received'); + $job = new $jobClass( + $this->container, + $this->connection, + $message, + $connectionName, + $queue + ); + + $this->currentJob = $job; + + if ($this->asyncSignalsSupported) { + $this->registerTimeoutHandler($job, $options); + } + + $jobsProcessed++; + + $this->runJob($job, $connectionName, $options); + + if ($this->asyncSignalsSupported) { + $this->resetTimeoutHandler(); + } + + if ($options->rest > 0) { + $this->sleep($options->rest); + } + + $this->printOutputLine('New message is processed'); + }, + null, + $arguments + ); - if ($this->supportsAsyncSignals()) { - $this->resetTimeoutHandler(); + while ($this->channel->is_consuming()) { + // Before reserving any jobs, we will make sure this queue is not paused and + // if it is we will just pause this worker for a given amount of time and + // make sure we do not need to kill this worker process off completely. + if (! $this->blocking && ! $this->daemonShouldRun($options, $connectionName, $queue)) { + $status = $this->pauseWorker($options, $lastRestart); + if ($status !== null) { + return $this->stop($status, $options); + } + + continue; } - if ($options->rest > 0) { - $this->sleep($options->rest); + // If the daemon should run (not in maintenance mode, etc.), then we can wait for a job. + try { + $this->channel->wait(null, ! $this->blocking, $this->blocking ? $this->aliveCheck : $options->timeout); + } catch (AMQPTimeoutException $exception) { + if ($this->blocking && $this->aliveCheck > 0) { + $this->checkAlive(); + } else { + throw $exception; + } + } catch (AMQPExceptionInterface $exception) { + throw $exception; + } catch (Throwable $exception) { + $this->exceptions->report($exception); + + $this->stopWorkerIfLostConnection($exception); } - }, - null, - $arguments - ); - while ($this->channel->is_consuming()) { - // Before reserving any jobs, we will make sure this queue is not paused and - // if it is we will just pause this worker for a given amount of time and - // make sure we do not need to kill this worker process off completely. - if (! $this->daemonShouldRun($options, $connectionName, $queue)) { - $this->pauseWorker($options, $lastRestart); + // If no job is got off the queue, we will need to sleep the worker. + if (! $this->blocking && ! $this->currentJob) { + $this->sleep($options->sleep); + } - continue; + // Finally, we will check to see if we have exceeded our memory limits or if + // the queue should restart based on other indications. If so, we'll stop + // this worker and let whatever is "monitoring" it restart the process. + $status = $this->stopIfNecessary( + $options, + $lastRestart, + $startTime, + $jobsProcessed, + $this->currentJob + ); + if ($status !== null) { + return $this->stop($status, $options); + } + + $this->currentJob = null; } - // If the daemon should run (not in maintenance mode, etc.), then we can wait for a job. - try { - $this->channel->wait(null, true, (int) $options->timeout); - } catch (AMQPRuntimeException $exception) { - $this->exceptions->report($exception); + if (! $this->blocking) { + break; + } + } - $this->kill(self::EXIT_ERROR, $options); - } catch (Exception|Throwable $exception) { - $this->exceptions->report($exception); + return null; + } - $this->stopWorkerIfLostConnection($exception); + /** + * @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException + */ + protected function initConnection(string $connectionName, string $queue): void + { + if ($this->connection !== null) { + // Reconnecting + $this->connection->reconnect(); + } else { + /* @var RabbitMQQueue $connection */ + $this->connection = $this->manager->connection($connectionName); + if (! $this->connection instanceof RabbitMQQueue) { + throw new Exception('Connection should implement '.RabbitMQQueue::class); } - // If no job is got off the queue, we will need to sleep the worker. - if ($this->currentJob === null) { - $this->sleep($options->sleep); - } + // Force disable retrying when we are in the consumer context (to avoid bad state of a job) + $this->connection->setDisableRetries(true); + } - // Finally, we will check to see if we have exceeded our memory limits or if - // the queue should restart based on other indications. If so, we'll stop - // this worker and let whatever is "monitoring" it restart the process. - $status = $this->stopIfNecessary( - $options, - $lastRestart, - $startTime, - $jobsProcessed, - $this->currentJob - ); + $this->initChannel($queue); + } - if (! is_null($status)) { - return $this->stop($status, $options); - } + /** + * @return void + * + * @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException + */ + protected function initChannel(string $queue) + { + if ($this->initQueue) { + $this->declareQueue($this->connection, $queue); + } - $this->currentJob = null; + $this->channel = $this->connection->getChannel(); + $this->channel->basic_qos( + $this->prefetchSize, + $this->prefetchCount, + null + ); + } + + /** + * @return bool + * + * @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException + */ + protected function reinitChannelIfNeed(string $queue) + { + // Check that channel is active + // Connection can close channel because of reconnection mechanism itself + if ($this->channel->is_open()) { + return false; } + + $this->initChannel($queue); + + return true; } /** @@ -193,18 +415,226 @@ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $que } /** - * Stop listening and bail out of the script. - * - * @param int $status - * @param WorkerOptions|null $options - * @return int + * {@inheritdoc} + */ + protected function runJob($job, $connectionName, WorkerOptions $options) + { + if (! $this->blocking) { + return parent::runJob($job, $connectionName, $options); + } + + try { + return $this->process($connectionName, $job, $options); + } catch (Throwable $e) { + // Throw exception to call reconnect + if ($this->isAmqpConnectionException($e)) { + throw $e; + } + + $this->exceptions->report($e); + + $this->stopWorkerIfLostConnection($e); + } + } + + /** + * {@inheritDoc} + */ + protected function listenForSignals() + { + if (! $this->blocking) { + parent::listenForSignals(); + + return; + } + + // Support pause/exit for blocking mode + pcntl_async_signals(true); + pcntl_signal(SIGHUP, [$this, 'quitSignalHandler']); + pcntl_signal(SIGTERM, [$this, 'quitSignalHandler']); + pcntl_signal(SIGQUIT, [$this, 'quitSignalHandler']); + + pcntl_signal(SIGUSR2, function () { + $this->printOutputLine('SIGUSR2('.SIGUSR2.') is received. Pause this worker', 'warning'); + $this->paused = true; + $this->channel->basic_cancel($this->consumerTag); + }); + + pcntl_signal(SIGCONT, function () { + $this->printOutputLine('SIGCONT('.SIGCONT.') is received. Unpause this worker', 'warning'); + $this->paused = false; + }); + } + + /** + * @param null $sigInfo + */ + public function quitSignalHandler($signal, $sigInfo = null): void + { + $signalName = null; + switch ($signal) { + case SIGHUP: + $signalName = 'SIGHUP'; + break; + case SIGTERM: + $signalName = 'SIGTERM'; + break; + case SIGQUIT: + $signalName = 'SIGQUIT'; + break; + } + + $this->printOutputLine("$signalName($signal) is received. Activate the `shouldQuit` option", 'warning'); + $this->shouldQuit = true; + $this->channel->basic_cancel($this->consumerTag); + } + + /** + * @throws AMQPProtocolChannelException + */ + protected function declareQueue(RabbitMQQueue $queue, ?string $queueName): void + { + // When the queue already exists, just return. + if ($queue->isQueueExists($queueName)) { + return; + } + + // Create a queue + $queue->declareQueueByConfig($queueName); + } + + /** + * {@inheritDoc} */ public function stop($status = 0, $options = null) { + if (is_array($status)) { + [$status, $reason] = $status; + } else { + $reason = null; + } + + $reason = $reason ?: 'no reason'; + $this->printOutputLine('Stopping this worker with status '.$status.' because of '.$reason, 'error'); + // Tell the server you are going to stop consuming. - // It will finish up the last message and not send you any more. + // It will finish up the last message and not send you anymore. $this->channel->basic_cancel($this->consumerTag, false, true); return parent::stop($status, $options); } + + /** + * {@inheritDoc} + */ + public function kill($status = 0, $options = null) + { + $this->printOutputLine('Killing this worker with status '.$status, 'error'); + parent::kill($status, $options); + } + + /** + * Same as parent, but with logs. + * {@inheritDoc} + */ + public function stopIfNecessary(WorkerOptions $options, $lastRestart, $startTime = 0, $jobsProcessed = 0, $job = null) + { + return match (true) { + $this->shouldQuit => [static::EXIT_SUCCESS, '`shouldQuit` is active'], + $this->memoryExceeded($options->memory) => [static::EXIT_MEMORY_LIMIT, 'exceeded memory-limit'], + $this->queueShouldRestart($lastRestart) => [static::EXIT_SUCCESS, 'should-restart'], + $options->stopWhenEmpty && is_null($job) => [static::EXIT_SUCCESS, '`stopWhenEmpty` option is active'], + $options->maxTime && hrtime(true) / 1e9 - $startTime >= $options->maxTime => [static::EXIT_SUCCESS, 'reached max working time'], + $options->maxJobs && $jobsProcessed >= $options->maxJobs => [static::EXIT_SUCCESS, 'reached max processed jobs'], + default => null + }; + } + + /** + * Same as parent, but with logs. + * {@inheritDoc} + */ + protected function registerTimeoutHandler($job, WorkerOptions $options) + { + // We will register a signal handler for the alarm signal so that we can kill this + // process if it is running too long because it has frozen. This uses the async + // signals supported in recent versions of PHP to accomplish it conveniently. + pcntl_signal(SIGALRM, function () use ($job, $options) { + $this->printOutputLine('SIGALRM('.SIGALRM.') is received (timeout). Cancelling a job and killing this worker', 'error'); + + if ($job) { + $this->markJobAsFailedIfWillExceedMaxAttempts( + $job->getConnectionName(), $job, (int) $options->maxTries, $e = $this->timeoutExceededException($job) + ); + + $this->markJobAsFailedIfWillExceedMaxExceptions( + $job->getConnectionName(), $job, $e + ); + + $this->markJobAsFailedIfItShouldFailOnTimeout( + $job->getConnectionName(), $job, $e + ); + + $this->events->dispatch(new JobTimedOut( + $job->getConnectionName(), $job + )); + } + + $this->kill(static::EXIT_ERROR, $options); + }, true); + + pcntl_alarm( + max($this->timeoutForJob($job, $options), 0) + ); + } + + /** + * @return bool + */ + protected function printOutputLine(string $message, string $type = 'comment') + { + if (! $this->interactsWithIO) { + return false; + } + + $message = sprintf( + '[%s] %s', + date('Y-m-d H:i:s.u'), + $message + ); + switch ($type) { + case 'comment': + $this->interactsWithIO->line($message, null, OutputInterface::VERBOSITY_VERBOSE); + break; + case 'warning': + $this->interactsWithIO->warn($message, OutputInterface::VERBOSITY_NORMAL); + break; + default: + $this->interactsWithIO->error($message, OutputInterface::VERBOSITY_NORMAL); + break; + } + + return true; + } + + protected function isAmqpConnectionException(Throwable $exception): bool + { + return $exception instanceof AMQPExceptionInterface; + } + + protected function checkAlive(): void + { + try { + // We need to call any command that has a response, and we can wait for it. + // If there is no response, we consider the current connection dead + // WARNING: It works ONLY if `channel_rpc_timeout` is greater than 0 + $this->channel->basic_qos( + $this->prefetchSize, + $this->prefetchCount, + null + ); + } catch (AMQPTimeoutException $exception) { + throw new AMQPTimeoutException('Custom alive check failed', $exception->getTimeout(), $exception->getCode(), $exception); + } + } } diff --git a/src/Queue/Connection/ConfigFactory.php b/src/Queue/Connection/ConfigFactory.php index f3ecd409..87bf97a2 100644 --- a/src/Queue/Connection/ConfigFactory.php +++ b/src/Queue/Connection/ConfigFactory.php @@ -37,6 +37,8 @@ public static function make(array $config = []): AMQPConnectionConfig self::getHostFromConfig($connectionConfig, $config); self::getHeartbeatFromConfig($connectionConfig, $config); + self::getKeepAliveFromConfig($connectionConfig, $config); + self::getChannelRpcTimeoutConfig($connectionConfig, $config); self::getNetworkProtocolFromConfig($connectionConfig, $config); }); } @@ -93,6 +95,22 @@ protected static function getHeartbeatFromConfig(AMQPConnectionConfig $connectio } } + protected static function getKeepAliveFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $keepalive = Arr::get($config, self::CONFIG_OPTIONS.'.keepalive'); + if (is_bool($keepalive)) { + $connectionConfig->setKeepalive($keepalive); + } + } + + protected static function getChannelRpcTimeoutConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $timeout = Arr::get($config, self::CONFIG_OPTIONS.'.channel_rpc_timeout'); + if (is_numeric($timeout)) { + $connectionConfig->setChannelRPCTimeout((float)$timeout); + } + } + protected static function getNetworkProtocolFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void { if ($networkProtocol = Arr::get($config, 'network_protocol')) { diff --git a/src/Queue/QueueConfig.php b/src/Queue/QueueConfig.php index e7ec27c4..412270a2 100644 --- a/src/Queue/QueueConfig.php +++ b/src/Queue/QueueConfig.php @@ -14,6 +14,14 @@ class QueueConfig protected bool $prioritizeDelayed = false; + protected bool $cacheDeclared = true; + + protected bool $queueAutoDelete = false; + + protected bool $queueDurable = true; + + protected bool $useExpirationForDelayedQueues = false; + protected int $queueMaxPriority = 2; protected string $exchange = ''; @@ -22,6 +30,8 @@ class QueueConfig protected string $exchangeRoutingKey = '%s'; + protected bool $declareFullRoute = false; + protected bool $rerouteFailed = false; protected string $failedExchange = ''; @@ -30,6 +40,14 @@ class QueueConfig protected bool $quorum = false; + protected ?string $logChannelName = null; + + protected array $retryOptions = [ + 'enable' => false, + 'max' => 5, + 'pause_micro_seconds' => 1e6, + ]; + protected array $options = []; /** @@ -275,4 +293,88 @@ protected function toBoolean($value): bool { return filter_var($value, FILTER_VALIDATE_BOOLEAN); } + + public function isCacheDeclared(): bool + { + return $this->cacheDeclared; + } + + public function setCacheDeclared(bool $cacheDeclared): QueueConfig + { + $this->cacheDeclared = $cacheDeclared; + + return $this; + } + + public function isQueueDurable(): bool + { + return $this->queueDurable; + } + + public function setQueueDurable(bool $queueDurable): QueueConfig + { + $this->queueDurable = $queueDurable; + + return $this; + } + + public function isQueueAutoDelete(): bool + { + return $this->queueAutoDelete; + } + + public function setQueueAutoDelete(bool $queueAutoDelete): QueueConfig + { + $this->queueAutoDelete = $queueAutoDelete; + + return $this; + } + + public function isUseExpirationForDelayedQueues(): bool + { + return $this->useExpirationForDelayedQueues; + } + + public function setUseExpirationForDelayedQueues(bool $useExpirationForDelayedQueues): QueueConfig + { + $this->useExpirationForDelayedQueues = $useExpirationForDelayedQueues; + + return $this; + } + + public function getRetryOptions(): array + { + return $this->retryOptions; + } + + public function setRetryOptions(array $retryOptions): QueueConfig + { + $this->retryOptions = $retryOptions; + + return $this; + } + + public function getLogChannelName(): ?string + { + return $this->logChannelName; + } + + public function setLogChannelName(?string $logChannelName): QueueConfig + { + $this->logChannelName = $logChannelName; + + return $this; + } + + public function isDeclareFullRoute(): bool + { + return $this->declareFullRoute; + } + + public function setDeclareFullRoute(bool $declareFullRoute): QueueConfig + { + $this->declareFullRoute = $declareFullRoute; + + return $this; + } } diff --git a/src/Queue/QueueConfigFactory.php b/src/Queue/QueueConfigFactory.php index 6f2befc5..f09a9a75 100644 --- a/src/Queue/QueueConfigFactory.php +++ b/src/Queue/QueueConfigFactory.php @@ -68,6 +68,44 @@ protected static function getOptionsFromConfig(QueueConfig $queueConfig, array $ $queueConfig->setQuorum($quorum); } + // Feature: Retries with Logs + if ($retriesOption = Arr::pull($queueOptions, 'retries')) { + $queueConfig->setRetryOptions($retriesOption); + } + if (array_key_exists('log_channel', $queueOptions)) { + $queueConfig->setLogChannelName($queueOptions['log_channel']); + unset($queueOptions['log_channel']); + } + + // Feature: Caching + if (array_key_exists('cache_declared', $queueOptions)) { + $queueConfig->setCacheDeclared($queueOptions['cache_declared']); + unset($queueOptions['cache_declared']); + } + + // Feature: Queue flags + $queueFlags = Arr::pull($queueOptions, 'flags'); + if ($queueFlags) { + if (array_key_exists('durable', $queueFlags)) { + $queueConfig->setQueueDurable($queueFlags['durable']); + } + if (array_key_exists('auto_delete', $queueFlags)) { + $queueConfig->setQueueAutoDelete($queueFlags['auto_delete']); + } + } + + // Feature: Another strategy for later tasks + if (array_key_exists('use_expiration_for_delayed_queues', $queueOptions)) { + $queueConfig->setUseExpirationForDelayedQueues($queueOptions['use_expiration_for_delayed_queues']); + unset($queueOptions['use_expiration_for_delayed_queues']); + } + + // Feature: Declare full route + if (array_key_exists('declare_full_route', $queueOptions)) { + $queueConfig->setDeclareFullRoute($queueOptions['declare_full_route']); + unset($queueOptions['declare_full_route']); + } + // All extra options not defined $queueConfig->setOptions($queueOptions); } diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index fadedce5..a41c96df 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -11,17 +11,20 @@ use Illuminate\Queue\Queue; use Illuminate\Support\Arr; use Illuminate\Support\Facades\Crypt; +use Illuminate\Support\Facades\Log; use Illuminate\Support\Str; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPConnectionBlockedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; +use PhpAmqpLib\Exception\AMQPExceptionInterface; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; +use Psr\Log\LoggerInterface; use RuntimeException; use Throwable; use VladimirYuldashev\LaravelQueueRabbitMQ\Contracts\RabbitMQQueueContract; @@ -54,6 +57,11 @@ class RabbitMQQueue extends Queue implements QueueContract, RabbitMQQueueContrac */ protected array $boundQueues = []; + /** + * Disable retries if needs + */ + protected bool $disableRetries = false; + /** * Current job being processed. */ @@ -64,6 +72,11 @@ class RabbitMQQueue extends Queue implements QueueContract, RabbitMQQueueContrac */ protected QueueConfig $config; + /** + * @var \Psr\Log\LoggerInterface + */ + protected LoggerInterface $logChannel; + /** * RabbitMQQueue constructor. */ @@ -71,6 +84,17 @@ public function __construct(QueueConfig $config) { $this->config = $config; $this->dispatchAfterCommit = $config->isDispatchAfterCommit(); + $this->logChannel = Log::channel($config->getLogChannelName()); + } + + /** + * @param bool $value + * + * @return void + */ + public function setDisableRetries(bool $value): void + { + $this->disableRetries = $value; } /** @@ -80,18 +104,20 @@ public function __construct(QueueConfig $config) */ public function size($queue = null): int { - $queue = $this->getQueue($queue); + return $this->retryOnError(function () use ($queue) { + $queue = $this->getQueue($queue); - if (! $this->isQueueExists($queue)) { - return 0; - } + if (!$this->isQueueExists($queue)) { + return 0; + } - // create a temporary channel, so the main channel will not be closed on exception - $channel = $this->createChannel(); - [, $size] = $channel->queue_declare($queue, true); - $channel->close(); + // create a temporary channel, so the main channel will not be closed on exception + $channel = $this->createChannel(); + [, $size] = $channel->queue_declare($queue, true); + $channel->close(); - return $size; + return $size; + }); } /** @@ -119,15 +145,17 @@ function ($payload, $queue) { */ public function pushRaw($payload, $queue = null, array $options = []): int|string|null { - [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); + return $this->retryOnError(function () use ($payload, $queue, $options) { + [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); - $this->declareDestination($destination, $exchange, $exchangeType); + $this->declareDestination($destination, $exchange, $exchangeType); - [$message, $correlationId] = $this->createMessage($payload, $attempts); + [$message, $correlationId] = $this->createMessage($payload, $attempts); - $this->publishBasic($message, $exchange, $destination, true); + $this->publishBasic($message, $exchange, $destination, true); - return $correlationId; + return $correlationId; + }); } /** @@ -163,20 +191,21 @@ public function laterRaw($delay, string $payload, $queue = null, int $attempts = return $this->pushRaw($payload, $queue, $options); } - // Create a main queue to handle delayed messages - [$mainDestination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); - $this->declareDestination($mainDestination, $exchange, $exchangeType); + return $this->retryOnError(function () use ($payload, $queue, $delay, $options, $ttl) { + // Create a main queue to handle delayed messages + [$mainDestination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); + $this->declareDestination($mainDestination, $exchange, $exchangeType); - $destination = $this->getQueue($queue).'.delay.'.$ttl; + $destination = $this->getDelayedQueueName($queue, $ttl); + $this->declareQueue($destination, true, false, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); - $this->declareQueue($destination, true, false, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); + [$message, $correlationId] = $this->createMessage($payload, $attempts, $ttl); - [$message, $correlationId] = $this->createMessage($payload, $attempts); + // Publish directly on the delayQueue, no need to publish through an exchange. + $this->publishBasic($message, null, $destination, true); - // Publish directly on the delayQueue, no need to publish through an exchange. - $this->publishBasic($message, null, $destination, true); - - return $correlationId; + return $correlationId; + }); } /** @@ -194,11 +223,21 @@ public function bulk($jobs, $data = '', $queue = null): void */ protected function publishBatch($jobs, $data = '', $queue = null): void { - foreach ($jobs as $job) { - $this->bulkRaw($this->createPayload($job, $queue, $data), $queue, ['job' => $job]); + $payloads = []; + foreach ((array)$jobs as $job) { + $payloads[] = [ + $this->createPayload($job, $queue, $data), + $job, + ]; } - $this->batchPublish(); + $this->retryOnError(function () use ($payloads, $jobs, $data, $queue) { + foreach ($payloads as $payloadAndJob) { + $this->bulkRaw($payloadAndJob[0], $queue, ['job' => $payloadAndJob[1]]); + } + + $this->batchPublish(); + }); } /** @@ -217,6 +256,35 @@ public function bulkRaw(string $payload, ?string $queue = null, array $options = return $correlationId; } + /** + * @param array $payloads + * @param null $queue + * @param array $options + * + * @return array + * @throws AMQPProtocolChannelException + */ + public function pushBulkRaw(array $payloads, $queue = null, array $options = []) + { + return $this->retryOnError(function () use ($payloads, $queue, $options) { + [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); + + $this->declareDestination($destination, $exchange, $exchangeType); + + $ids = []; + $channel = $this->getChannel(); + foreach ($payloads as $payload) { + [$message, $correlationId] = $this->createMessage($payload, $attempts); + $channel->batch_basic_publish($message, $exchange, $destination); + $ids[] = $correlationId; + } + + $this->batchPublish(); + + return $ids; + }); + } + /** * {@inheritdoc} * @@ -245,7 +313,7 @@ public function pop($queue = null) if ($exception->amqp_reply_code === 404) { // Because of the channel exception the channel was closed and removed. // We have to open a new channel. Because else the worker(s) are stuck in a loop, without processing. - $this->getChannel(true); + $this->initChannel(); return null; } @@ -331,7 +399,9 @@ public function isExchangeExists(string $exchange): bool $channel->exchange_declare($exchange, '', true); $channel->close(); - $this->exchanges[] = $exchange; + if ($this->getConfig()->isCacheDeclared()) { + $this->cacheDeclaredExchange($exchange); + } return true; } catch (AMQPProtocolChannelException $exception) { @@ -381,9 +451,7 @@ public function deleteExchange(string $name, bool $unused = false): void return; } - $idx = array_search($name, $this->exchanges); - unset($this->exchanges[$idx]); - + unset($this->exchanges[$name]); $this->getChannel()->exchange_delete( $name, $unused @@ -411,7 +479,10 @@ public function isQueueExists(?string $name = null): bool $channel->queue_declare($queueName, true); $channel->close(); - $this->queues[] = $queueName; + $cfg = $this->getConfig(); + if ($cfg->isCacheDeclared() && ! $cfg->isQueueAutoDelete()) { + $this->cacheDeclaredQueue($queueName); + } return true; } catch (AMQPProtocolChannelException $exception) { @@ -445,6 +516,10 @@ public function declareQueue( false, new AMQPTable($arguments) ); + + if (! $autoDelete && ! isset($arguments['x-expires']) && $this->getConfig()->isCacheDeclared()) { + $this->cacheDeclaredQueue($name); + } } /** @@ -459,9 +534,7 @@ public function deleteQueue(string $name, bool $if_unused = false, bool $if_empt return; } - $idx = array_search($name, $this->queues); - unset($this->queues[$idx]); - + unset($this->queues[$name]); $this->getChannel()->queue_delete($name, $if_unused, $if_empty); } @@ -470,15 +543,15 @@ public function deleteQueue(string $name, bool $if_unused = false, bool $if_empt */ public function bindQueue(string $queue, string $exchange, string $routingKey = ''): void { - if (in_array( - implode('', compact('queue', 'exchange', 'routingKey')), - $this->boundQueues, - true - )) { + $bindKey = $queue . $exchange . $routingKey; + if (isset($this->boundQueues[$bindKey])) { return; } $this->getChannel()->queue_bind($queue, $exchange, $routingKey); + if ($this->getConfig()->isCacheDeclared()) { + $this->boundQueues[$bindKey] = 1; + } } /** @@ -508,10 +581,26 @@ public function reject(RabbitMQJob $job, bool $requeue = false): void $this->getChannel()->basic_reject($job->getRabbitMQMessage()->getDeliveryTag(), $requeue); } + /** + * @return \PhpAmqpLib\Channel\AbstractChannel|AMQPChannel + */ + protected function initChannel(): AMQPChannel|\PhpAmqpLib\Channel\AbstractChannel + { + if ($this->channel) { + try { + $this->channel->close(); + } catch (Throwable $e) { + /* Ignore closing errors */ + } + } + + return $this->channel = $this->getConnection()->channel(); + } + /** * Create a AMQP message. */ - protected function createMessage($payload, int $attempts = 0): array + protected function createMessage($payload, int $attempts = 0, int $expiration = 0): array { $properties = [ 'content_type' => 'application/json', @@ -547,6 +636,10 @@ protected function createMessage($payload, int $attempts = 0): array ], ])); + if ($expiration > 0 && $this->getConfig()->isUseExpirationForDelayedQueues()) { + $message->set('expiration', $expiration); + } + return [ $message, $correlationId, @@ -594,6 +687,29 @@ public function close(): void } } + /** + * @param string $destination + * @return void + */ + public function declareQueueByConfig(string $destination): void + { + $this->declareQueue( + $destination, + $this->getConfig()->isQueueDurable(), + $this->getConfig()->isQueueAutoDelete(), + $this->getQueueArguments($destination) + ); + } + + /** + * @param Throwable $exception + * @return bool + */ + protected function isAmqpConnectionException(Throwable $exception): bool + { + return $exception instanceof AMQPExceptionInterface; + } + /** * Get the Queue arguments. */ @@ -626,12 +742,17 @@ protected function getQueueArguments(string $destination): array */ protected function getDelayQueueArguments(string $destination, int $ttl): array { - return [ + $result = [ 'x-dead-letter-exchange' => $this->getExchange(), 'x-dead-letter-routing-key' => $this->getRoutingKey($destination), - 'x-message-ttl' => $ttl, - 'x-expires' => $ttl * 2, ]; + + if (! $this->getConfig()->isUseExpirationForDelayedQueues()) { + $result['x-message-ttl'] = $ttl; + $result['x-expires'] = $ttl * 2; + } + + return $result; } /** @@ -683,7 +804,7 @@ protected function getFailedRoutingKey(string $destination): string */ protected function isExchangeDeclared(string $name): bool { - return in_array($name, $this->exchanges, true); + return isset($this->exchanges[$name]); } /** @@ -691,7 +812,7 @@ protected function isExchangeDeclared(string $name): bool */ protected function isQueueDeclared(string $name): bool { - return in_array($name, $this->queues, true); + return isset($this->queues[$name]); } /** @@ -701,23 +822,30 @@ protected function isQueueDeclared(string $name): bool */ protected function declareDestination(string $destination, ?string $exchange = null, string $exchangeType = AMQPExchangeType::DIRECT): void { - // When an exchange is provided and no exchange is present in RabbitMQ, create an exchange. - if ($exchange && ! $this->isExchangeExists($exchange)) { - $this->declareExchange($exchange, $exchangeType); - } + $declareAll = $this->getConfig()->isDeclareFullRoute(); - // When an exchange is provided, just return. + // When an exchange is provided and no exchange is present in RabbitMQ, create an exchange. if ($exchange) { - return; + if (! $this->isExchangeExists($exchange)) { + $this->declareExchange($exchange, $exchangeType); + } + + // When an exchange is provided, just return. + if (! $declareAll) { + return; + } } // When the queue already exists, just return. - if ($this->isQueueExists($destination)) { - return; + if (!$this->isQueueExists($destination)) { + // Create a queue for amq.direct publishing. + $this->declareQueueByConfig($destination); } - // Create a queue for amq.direct publishing. - $this->declareQueue($destination, true, false, $this->getQueueArguments($destination)); + if (! $declareAll) { + return; + } + $this->bindQueue($destination, $exchange, $this->getRoutingKey($destination)); } /** @@ -755,10 +883,10 @@ protected function batchPublish(): void $this->getChannel()->publish_batch(); } - public function getChannel($forceNew = false): AMQPChannel + public function getChannel(): AMQPChannel { - if (! $this->channel || $forceNew) { - $this->channel = $this->createChannel(); + if (! $this->channel) { + return $this->initChannel(); } return $this->channel; @@ -772,11 +900,94 @@ protected function createChannel(): AMQPChannel /** * @throws Exception */ - protected function reconnect(): void + public function reconnect(): void { // Reconnects using the original connection settings. $this->getConnection()->reconnect(); // Create a new main channel because all old channels are removed. - $this->getChannel(true); + $this->initChannel(); + + $this->queues = []; + $this->boundQueues = []; + $this->exchanges = []; + } + + protected function cacheDeclaredQueue(string $name): void + { + $this->queues[$name] = 1; + } + + protected function cacheDeclaredExchange(string $name): void + { + $this->exchanges[$name] = 1; + } + + /** + * @param callable $call + * @return mixed + * @throws Throwable + */ + protected function retryOnError(callable $call) + { + $currentRetry = 0; + $retryOptions = $this->getConfig()->getRetryOptions(); + + $enabled = $this->disableRetries ? false : ($retryOptions['enabled'] ?? false); + $max = $retryOptions['max'] ?? 5; + $pauseMs = $retryOptions['pause_micro_seconds'] ?? 1e6; + $pauseS = round($pauseMs / 1e6, 2); + + while (true) { + try { + if ($currentRetry > 0) { + $this->reconnect(); + } + + return $call(); + } catch (Throwable $exception) { + if (! $enabled || ! $this->isAmqpConnectionException($exception)) { + throw $exception; + } + + $currentRetry++; + if ($max > 0 && $currentRetry > $max) { + throw $exception; + } + + $this->logChannel->warning("Got AMQP error, retrying after $pauseS seconds"); + usleep($pauseMs); + } + } + } + + /** + * @param string|null $queue + * @param int $ttl + * @return string + */ + protected function getDelayedQueueName(?string $queue, int $ttl): string + { + $destination = $this->getQueue($queue); + if ($this->getConfig()->isUseExpirationForDelayedQueues()) { + $destination = $destination.'_deferred'; + } else { + $destination = $destination.'.delay.'.$ttl; + } + + return $destination; + } + + /** + * @param \DateTimeInterface|\DateInterval|int|float $delay + * @return float|int + */ + protected function secondsUntil($delay) + { + // Support ms + if (is_float($delay)) { + return $delay; + } + + return parent::secondsUntil($delay); } } diff --git a/tests/Feature/Commands/ConsumeCommandTest.php b/tests/Feature/Commands/ConsumeCommandTest.php new file mode 100644 index 00000000..bac98017 --- /dev/null +++ b/tests/Feature/Commands/ConsumeCommandTest.php @@ -0,0 +1,150 @@ +app['config']->set("queue.connections.$queueNameAndConnection", $this->app['config']->get('queue.connections.rabbitmq')); + $this->app['config']->set("queue.connections.$queueNameAndConnection.queue", $queueNameAndConnection); + + $service = $this->createMock(TestJob::class); + $serviceName = Str::random(); + $this->app->singleton($serviceName, fn() => $service); + $service + ->expects($this->once()) + ->method('handle'); + + Queue::push(new TestJobCallService($serviceName, 'handle'), '', $queueNameAndConnection); + $blocking = (int)$blocking; + $this->artisan("rabbitmq:consume $queueNameAndConnection --max-jobs=1 --blocking=$blocking"); + } + + #[TestWith([false])] + #[TestWith([true])] + public function test_consume_with_retry(bool $blocking): void + { + $queueNameAndConnection = Str::random(); + $this->app['config']->set("queue.connections.$queueNameAndConnection", $this->app['config']->get('queue.connections.rabbitmq')); + $this->app['config']->set("queue.connections.$queueNameAndConnection.queue", $queueNameAndConnection); + + /** @var RabbitMQQueue $connection */ + $connection = Queue::connection($queueNameAndConnection); + + // First job does nothing + $service1 = $this->createMock(TestJob::class); + $serviceName1 = Str::random(); + $this->app->singleton($serviceName1, fn() => $service1); + $service1 + ->expects($this->once()) + ->method('handle'); + Queue::push(new TestJobCallService($serviceName1, 'handle'), '', $queueNameAndConnection); + + // Second job must fail first time (by breaking channel) and be success second time + $service2 = (object)[]; + $numberOfCalls = 0; + $service2->callback = function () use (&$numberOfCalls, $connection) { + $numberOfCalls++; + if ($numberOfCalls === 1) { + $connection->getChannel()->close(); + } + }; + $serviceName2 = Str::random(); + $this->app->singleton($serviceName2, fn() => $service2); + + Queue::push(new TestJobCallService($serviceName2, 'callback'), '', $queueNameAndConnection); + + $blocking = (int)$blocking; + $this->artisan("rabbitmq:consume $queueNameAndConnection --max-jobs=2 --blocking=$blocking --auto-reconnect=1 --verbose-messages=1 --init-queue=1"); + $this->assertEquals(2, $numberOfCalls); + } + + public function test_consume_blocking_alive_check(): void + { + $this->markTestSkippedUnless(extension_loaded('sockets'), 'Sockets extension is required'); + + $handler = $this->createMock(ExceptionHandler::class); + $handler + ->expects($this->once()) + ->method('report') + ->willReturnCallback(function (AMQPTimeoutException $exception) { + $this->assertEquals('Custom alive check failed', $exception->getMessage()); + }); + $this->app->instance(ExceptionHandler::class, $handler); + $queueNameAndConnection = Str::random(); + $this->app['config']->set("queue.connections.$queueNameAndConnection", $this->app['config']->get('queue.connections.rabbitmq')); + $this->app['config']->set("queue.connections.$queueNameAndConnection.queue", $queueNameAndConnection); + $this->app['config']->set("queue.connections.$queueNameAndConnection.options.channel_rpc_timeout", 1); + $this->app['config']->set("queue.connections.$queueNameAndConnection.options.keepalive", true); + + /** @var RabbitMQQueue $connection */ + $connection = Queue::connection($queueNameAndConnection); + + $port = random_int(10000, 50000); + $pathToSocksFile = __DIR__ . '/../../Script/socket_fake.php'; + $descriptors = [ + ['pipe', 'r'], // stdin + ['pipe', 'w'], // stdout + ['pipe', 'w'], // stderr + ]; + $proc = proc_open("php $pathToSocksFile $port", $descriptors, $pipes); + sleep(1); + $status = proc_get_status($proc); + if (!$status['running']) { + throw new \Exception('Cant make fake socket' . fread($pipes[1], 8192)); + } + + try { + $service = (object)[]; + $numberOfCalls1 = 0; + // Listen and do nothing + $service->callback1 = function () use (&$numberOfCalls1, $connection, $port) { + $numberOfCalls1++; + + // Emulate problems with connection + $input = $this->callProperty($connection->getConnection(), 'input'); + $className = get_class($input); + $reflection = new \ReflectionClass($className); + + $newStream = new StreamIO('127.0.0.1', $port, 1, 1); + $newStream->connect(); + + $property = $reflection->getProperty('io'); + $property->setAccessible(true); + $property->setValue($input, $newStream); + }; + $numberOfCalls2 = 0; + $service->callback2 = function () use (&$numberOfCalls2) { + // do nothing + $numberOfCalls2++; + }; + $serviceName = Str::random(); + $this->app->singleton($serviceName, fn() => $service); + $connection->push(new TestJobCallService($serviceName, 'callback1'), '', $queueNameAndConnection); + $connection->push(new TestJobCallService($serviceName, 'callback2'), '', $queueNameAndConnection); + + $this->artisan("rabbitmq:consume $queueNameAndConnection --max-jobs=2 --blocking=1 --auto-reconnect=1 --alive-check=1 --init-queue=1"); + $this->assertEquals(1, $numberOfCalls1); + $this->assertEquals(1, $numberOfCalls2); + $this->assertEquals(0, $connection->size($queueNameAndConnection)); + } finally { + proc_terminate($proc); + proc_close($proc); + } + } +} diff --git a/tests/Feature/TestCase.php b/tests/Feature/TestCase.php index 7d0fab9a..301acc50 100644 --- a/tests/Feature/TestCase.php +++ b/tests/Feature/TestCase.php @@ -5,9 +5,12 @@ use Illuminate\Database\DatabaseTransactionsManager; use Illuminate\Support\Facades\Queue; use Illuminate\Support\Str; +use PhpAmqpLib\Exception\AMQPExceptionInterface; use PhpAmqpLib\Exception\AMQPProtocolChannelException; +use PHPUnit\Framework\Attributes\TestWith; use RuntimeException; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob; +use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestEncryptedJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase as BaseTestCase; @@ -68,6 +71,33 @@ public function test_push_raw(): void $this->assertSame(0, Queue::size()); } + public function test_push_bulk_raw(): void + { + $payload1 = Str::random(); + $payload2 = Str::random(); + + Queue::pushBulkRaw([$payload1, $payload2]); + + sleep(1); + + $this->assertSame(2, Queue::size()); + $this->assertNotNull($job1 = Queue::pop()); + $this->assertNotNull($job2 = Queue::pop()); + $this->assertSame(1, $job1->attempts()); + $this->assertSame(1, $job2->attempts()); + $this->assertInstanceOf(RabbitMQJob::class, $job1); + $this->assertInstanceOf(RabbitMQJob::class, $job2); + $this->assertSame($payload1, $job1->getRawBody()); + $this->assertSame($payload2, $job2->getRawBody()); + + $this->assertNull($job1->getJobId()); + $this->assertNull($job2->getJobId()); + + $job1->delete(); + $job2->delete(); + $this->assertSame(0, Queue::size()); + } + public function test_push(): void { Queue::push(new TestJob); @@ -152,19 +182,38 @@ public function test_later_raw(): void $this->assertSame(0, Queue::size()); } - public function test_later(): void + #[TestWith([false])] + #[TestWith([true])] + public function test_later(bool $useExpirationOnMessage): void { - Queue::later(3, new TestJob); + $queueName = Str::random(); + // Make another connection + $this->app['config']->set('queue.connections.rabbitmq2', $this->app['config']->get('queue.connections.rabbitmq')); + $this->app['config']->set('queue.connections.rabbitmq2.queue', $queueName); + $this->app['config']->set('queue.connections.rabbitmq2.options.queue.use_expiration_for_delayed_queues', $useExpirationOnMessage); + // Disable caching + $this->app['config']->set('queue.connections.rabbitmq2.options.queue.cache_declared', false); + + if ($useExpirationOnMessage) { + $laterQueueName = "{$queueName}_deferred"; + } else { + $laterQueueName = "{$queueName}.delay.3000"; + } + /** @var RabbitMQQueue $connection */ + $connection = Queue::connection('rabbitmq2'); + $this->assertFalse($connection->isQueueExists($laterQueueName)); + $connection->later(3, new TestJob); + $this->assertTrue($connection->isQueueExists($laterQueueName)); sleep(1); - $this->assertSame(0, Queue::size()); - $this->assertNull(Queue::pop()); + $this->assertSame(0, $connection->size()); + $this->assertNull($connection->pop()); sleep(3); - $this->assertSame(1, Queue::size()); - $this->assertNotNull($job = Queue::pop()); + $this->assertSame(1, $connection->size()); + $this->assertNotNull($job = $connection->pop()); $this->assertInstanceOf(RabbitMQJob::class, $job); @@ -176,7 +225,7 @@ public function test_later(): void $this->assertNotNull($job->getJobId()); $job->delete(); - $this->assertSame(0, Queue::size()); + $this->assertSame(0, $connection->size()); } public function test_bulk(): void @@ -431,19 +480,57 @@ public function test_delete(): void $this->assertNull(Queue::pop()); } - public function test_failed(): void + #[TestWith([false])] + #[TestWith([true])] + public function test_push_retry(bool $enableRetries): void { - Queue::push(new TestJob); + // Make another connection + $this->app['config']->set('queue.connections.rabbitmq2', $this->app['config']->get('queue.connections.rabbitmq')); + $this->app['config']->set('queue.connections.rabbitmq2.options.queue.retries', [ + 'enabled' => $enableRetries, + 'max' => 1, + 'pause_micro_seconds' => 1 + ]); + + /** @var RabbitMQQueue $connection */ + $connection = Queue::connection('rabbitmq2'); + $connection->declareQueue('default'); + + // Now let's close connection, it will trigger retry + $connection->getChannel()->close(); + if ($enableRetries) { + $connection->push(new TestJob); + $this->assertSame(1, $connection->size()); + } else { + // Push will trigger exception + try { + $connection->push(new TestJob); + } catch (AMQPExceptionInterface $exception) { + $this->assertSame('Channel connection is closed.', $exception->getMessage()); + } + } + } - $job = Queue::pop(); + public function test_full_route_declare(): void + { + // Make another connection + $this->app['config']->set('queue.connections.rabbitmq2', $this->app['config']->get('queue.connections.rabbitmq')); + $this->app['config']->set('queue.connections.rabbitmq2.options.queue.declare_full_route', true); - $job->fail(new RuntimeException($job->resolveName().' has an exception.')); + $queue = Str::random(); + $exchange = Str::random(); - sleep(1); + /** @var RabbitMQQueue $connection */ + $connection = Queue::connection('rabbitmq2'); + $this->assertFalse($connection->isQueueExists($queue)); + $this->assertFalse($connection->isExchangeExists($exchange)); - $this->assertSame(true, $job->hasFailed()); - $this->assertSame(true, $job->isDeleted()); - $this->assertSame(0, Queue::size()); - $this->assertNull(Queue::pop()); + $connection->pushRaw('data', $queue, [ + 'exchange' => $exchange, + ]); + + $this->assertTrue($connection->isQueueExists($queue)); + $this->assertTrue($connection->isExchangeExists($exchange)); + $this->assertSame(1, $connection->size($queue)); } } diff --git a/tests/Functional/RabbitMQQueueTest.php b/tests/Functional/RabbitMQQueueTest.php index 21a2e1d4..8d7da4cd 100644 --- a/tests/Functional/RabbitMQQueueTest.php +++ b/tests/Functional/RabbitMQQueueTest.php @@ -4,6 +4,8 @@ use Illuminate\Support\Str; use PhpAmqpLib\Exchange\AMQPExchangeType; +use PhpAmqpLib\Wire\AMQPTable; +use PHPUnit\Framework\Attributes\TestWith; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Functional\TestCase as BaseTestCase; @@ -202,8 +204,11 @@ public function test_config_quorum(): void $this->assertTrue($this->callProperty($queue, 'config')->isQuorum()); } - public function test_declare_delete_exchange(): void + #[TestWith([false])] + #[TestWith([true])] + public function test_declare_delete_exchange(bool $cache): void { + $this->app['config']->set('queue.connections.rabbitmq.options.queue.cache_declared', $cache); $queue = $this->connection(); $name = Str::random(); @@ -212,13 +217,18 @@ public function test_declare_delete_exchange(): void $queue->declareExchange($name); $this->assertTrue($queue->isExchangeExists($name)); + $this->assertEquals($cache, $this->callMethod($queue, 'isExchangeDeclared', [$name])); $queue->deleteExchange($name); + $this->assertFalse($this->callMethod($queue, 'isExchangeDeclared', [$name])); $this->assertFalse($queue->isExchangeExists($name)); } - public function test_declare_delete_queue(): void + #[TestWith([false])] + #[TestWith([true])] + public function test_declare_delete_queue(bool $cache): void { + $this->app['config']->set('queue.connections.rabbitmq.options.queue.cache_declared', $cache); $queue = $this->connection(); $name = Str::random(); @@ -227,11 +237,39 @@ public function test_declare_delete_queue(): void $queue->declareQueue($name); $this->assertTrue($queue->isQueueExists($name)); + $this->assertEquals($cache, $this->callMethod($queue, 'isQueueDeclared', [$name])); $queue->deleteQueue($name); + $this->assertFalse($this->callMethod($queue, 'isQueueDeclared', [$name])); $this->assertFalse($queue->isQueueExists($name)); } + public function test_declare_queue_by_config(): void + { + $durable = false; + $autoDelete = true; + $this->app['config']->set('queue.connections.rabbitmq.options.queue.flags', [ + 'durable' => $durable, + 'auto_delete' => $autoDelete, + ]); + + $queue = $this->connection(); + $name = Str::random(); + $this->assertFalse($queue->isQueueExists($name)); + + $queue->declareQueueByConfig($name); + // No exception expected + $res = $queue->getChannel()->queue_declare( + $name, + false, + $durable, + false, + $autoDelete, + false + ); + $this->assertIsArray($res); + } + public function test_queue_arguments(): void { $name = Str::random(); @@ -274,6 +312,7 @@ public function test_queue_arguments(): void public function test_delay_queue_arguments(): void { + $this->app['config']->set('queue.connections.rabbitmq.options.queue.use_expiration_for_delayed_queues', true); $name = Str::random(); $ttl = 12000; @@ -281,9 +320,7 @@ public function test_delay_queue_arguments(): void $actual = $this->callMethod($queue, 'getDelayQueueArguments', [$name, $ttl]); $expected = [ 'x-dead-letter-exchange' => '', - 'x-dead-letter-routing-key' => $name, - 'x-message-ttl' => $ttl, - 'x-expires' => $ttl * 2, + 'x-dead-letter-routing-key' => $name ]; $this->assertEquals(array_keys($expected), array_keys($actual)); $this->assertEquals(array_values($expected), array_values($actual)); diff --git a/tests/Mocks/TestJobCallService.php b/tests/Mocks/TestJobCallService.php new file mode 100644 index 00000000..05d3998b --- /dev/null +++ b/tests/Mocks/TestJobCallService.php @@ -0,0 +1,32 @@ +service = $service; + $this->method = $method; + } + + public function handle(): void + { + $service = app($this->service); + if (method_exists($service, $this->method)) { + $service->{$this->method}(); + } else { + $callback = $service->{$this->method}; + $callback(); + } + } +} diff --git a/tests/Script/socket_fake.php b/tests/Script/socket_fake.php new file mode 100644 index 00000000..5bf34fb0 --- /dev/null +++ b/tests/Script/socket_fake.php @@ -0,0 +1,23 @@ +