Skip to content

Commit e11e2d7

Browse files
author
Oleg Namaka
committed
Port the solution in 5.4
1 parent 510f0b1 commit e11e2d7

12 files changed

+521
-26
lines changed

composer.json

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@
167167
"allow-plugins": {
168168
"php-http/discovery": false,
169169
"symfony/runtime": true
170-
}
170+
},
171+
"use-parent-dir": true
171172
},
172173
"autoload": {
173174
"psr-4": {
@@ -204,10 +205,6 @@
204205
"symfony/contracts": "2.5.x-dev"
205206
}
206207
}
207-
},
208-
{
209-
"type": "path",
210-
"url": "src/Symfony/Component/Runtime"
211208
}
212209
],
213210
"minimum-stability": "dev"

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,32 @@ public function testItReturnsTheDecodedMessageToTheHandler()
4646
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
4747
}
4848

49+
public function testItReturnsTheDecodedMessageToTheHandlerInBlockingMode()
50+
{
51+
$connection = $this->getMockBuilder(Connection::class)
52+
->disableOriginalConstructor()
53+
->onlyMethods(['getQueueNames', 'pull'])
54+
->getMock();
55+
$serializer = new Serializer(
56+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
57+
);
58+
59+
$amqpEnvelope = $this->createAMQPEnvelope();
60+
61+
$amqpQueue = $this->createMock(\AMQPQueue::class);
62+
$amqpQueue->method('getName')->willReturn('queueName');
63+
64+
$connection->method('getQueueNames')->willReturn(['queueName']);
65+
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
66+
\call_user_func($callback, $amqpEnvelope, $amqpQueue);
67+
});
68+
69+
$receiver = new AmqpReceiver($connection, $serializer);
70+
$receiver->pull(function (Envelope $envelope) {
71+
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
72+
});
73+
}
74+
4975
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
5076
{
5177
$this->expectException(TransportException::class);

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,36 @@ public function testReceivesMessages()
5252
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
5353
}
5454

55+
public function testReceivesMessagesInBlockingMode()
56+
{
57+
$transport = $this->getTransport(
58+
$serializer = $this->createMock(SerializerInterface::class),
59+
$connection = $this->getMockBuilder(Connection::class)
60+
->disableOriginalConstructor()
61+
->onlyMethods(['getQueueNames', 'pull'])
62+
->getMock(),
63+
);
64+
65+
$decodedMessage = new DummyMessage('Decoded.');
66+
67+
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
68+
$amqpEnvelope->method('getBody')->willReturn('body');
69+
$amqpEnvelope->method('getHeaders')->willReturn(['my' => 'header']);
70+
71+
$amqpQueue = $this->createMock(\AMQPQueue::class);
72+
$amqpQueue->method('getName')->willReturn('queueName');
73+
74+
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
75+
$connection->method('getQueueNames')->willReturn(['queueName']);
76+
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
77+
\call_user_func($callback, $amqpEnvelope, $amqpQueue);
78+
});
79+
80+
$transport->pull(function (Envelope $envelope) use ($decodedMessage) {
81+
$this->assertSame($decodedMessage, $envelope->getMessage());
82+
});
83+
}
84+
5585
private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): AmqpTransport
5686
{
5787
$serializer = $serializer ?? $this->createMock(SerializerInterface::class);

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1717
use Symfony\Component\Messenger\Exception\TransportException;
1818
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
19+
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
2021
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2122
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -25,7 +26,7 @@
2526
*
2627
* @author Samuel Roze <[email protected]>
2728
*/
28-
class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface
29+
class AmqpReceiver implements QueueReceiverInterface, QueueBlockingReceiverInterface, MessageCountAwareInterface
2930
{
3031
private $serializer;
3132
private $connection;
@@ -44,9 +45,46 @@ public function get(): iterable
4445
yield from $this->getFromQueues($this->connection->getQueueNames());
4546
}
4647

47-
/**
48-
* {@inheritdoc}
49-
*/
48+
public function pull(callable $callback): void
49+
{
50+
$this->pullFromQueues($this->connection->getQueueNames(), $callback);
51+
}
52+
53+
public function pullFromQueues(array $queueNames, callable $callback): void
54+
{
55+
if (0 === \count($queueNames)) {
56+
return;
57+
}
58+
59+
// Pop last queue to send callback
60+
$firstQueue = array_pop($queueNames);
61+
62+
foreach ($queueNames as $queueName) {
63+
$this->pullEnvelope($queueName, null);
64+
}
65+
66+
$this->pullEnvelope($firstQueue, $callback);
67+
}
68+
69+
private function pullEnvelope(string $queueName, ?callable $callback): void
70+
{
71+
if (null !== $callback) {
72+
$callback = function (\AMQPEnvelope $amqpEnvelope, \AMQPQueue $queue) use ($callback) {
73+
$queueName = $queue->getName();
74+
$body = $amqpEnvelope->getBody();
75+
$envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName);
76+
77+
return $callback($envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName)));
78+
};
79+
}
80+
81+
try {
82+
$this->connection->pull($queueName, $callback);
83+
} catch (\AMQPException $exception) {
84+
throw new TransportException($exception->getMessage(), 0, $exception);
85+
}
86+
}
87+
5088
public function getFromQueues(array $queueNames): iterable
5189
{
5290
foreach ($queueNames as $queueName) {
@@ -67,9 +105,15 @@ private function getEnvelope(string $queueName): iterable
67105
}
68106

69107
$body = $amqpEnvelope->getBody();
108+
$envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName);
109+
110+
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
111+
}
70112

113+
private function decodeAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, $body, string $queueName): Envelope
114+
{
71115
try {
72-
$envelope = $this->serializer->decode([
116+
return $this->serializer->decode([
73117
'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351
74118
'headers' => $amqpEnvelope->getHeaders(),
75119
]);
@@ -79,8 +123,6 @@ private function getEnvelope(string $queueName): iterable
79123

80124
throw $exception;
81125
}
82-
83-
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
84126
}
85127

86128
/**

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
16+
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
1617
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
1718
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1819
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -22,7 +23,7 @@
2223
/**
2324
* @author Nicolas Grekas <[email protected]>
2425
*/
25-
class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
26+
class AmqpTransport implements QueueReceiverInterface, QueueBlockingReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
2627
{
2728
private $serializer;
2829
private $connection;
@@ -43,6 +44,22 @@ public function get(): iterable
4344
return ($this->receiver ?? $this->getReceiver())->get();
4445
}
4546

47+
/**
48+
* {@inheritdoc}
49+
*/
50+
public function pull(callable $callback): void
51+
{
52+
$this->getReceiver()->pull($callback);
53+
}
54+
55+
/**
56+
* {@inheritdoc}
57+
*/
58+
public function pullFromQueues(array $queueNames, callable $callback): void
59+
{
60+
$this->getReceiver()->pullFromQueues($queueNames, $callback);
61+
}
62+
4663
/**
4764
* {@inheritdoc}
4865
*/

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ private static function normalizeQueueArguments(array $arguments): array
281281
}
282282

283283
if (!is_numeric($arguments[$key])) {
284-
throw new InvalidArgumentException(sprintf('Integer expected for queue argument "%s", "%s" given.', $key, get_debug_type($arguments[$key])));
284+
throw new InvalidArgumentException(sprintf('Integer expected for queue argument "%s", "%s" given.', $key, gettype($arguments[$key])));
285285
}
286286

287287
$arguments[$key] = (int) $arguments[$key];
@@ -457,6 +457,27 @@ public function get(string $queueName): ?\AMQPEnvelope
457457
return null;
458458
}
459459

460+
/**
461+
* Consume a message from the specified queue in blocking mode.
462+
*
463+
* @param ?callable(\AMQPEnvelope,\AMQPQueue):?false $callback If callback return false, then processing thread will be
464+
* returned from AMQPQueue::consume() to PHP script. If null is passed, then the messages delivered to this client
465+
* will be made available to the first real callback registered. That allows one to have a single callback
466+
* consuming from multiple queues.
467+
*
468+
* @throws \AMQPException
469+
*/
470+
public function pull(string $queueName, ?callable $callback): void
471+
{
472+
$this->clearWhenDisconnected();
473+
474+
if ($this->autoSetupExchange) {
475+
$this->setupExchangeAndQueues();
476+
}
477+
478+
$this->queue($queueName)->consume($callback);
479+
}
480+
460481
public function ack(\AMQPEnvelope $message, string $queueName): bool
461482
{
462483
return $this->queue($queueName)->ack($message->getDeliveryTag()) ?? true;

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ protected function configure(): void
8181
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
8282
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
8383
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
84+
new InputOption('blocking-mode', null, InputOption::VALUE_NONE, 'Consume messages in blocking mode. If option is specified only one receiver is supported'),
8485
])
8586
->setDescription(self::$defaultDescription)
8687
->setHelp(<<<'EOF'
@@ -122,6 +123,12 @@ protected function configure(): void
122123
Use the --no-reset option to prevent services resetting after each message (may lead to leaking services' state between messages):
123124
124125
<info>php %command.full_name% <receiver-name> --no-reset</info>
126+
127+
Use the --blocking-mode option to force receiver to work in blocking mode
128+
("consume" method will be used instead of "get" in RabbitMQ for example).
129+
Only supported by some receivers, and you should pass only one receiver:
130+
131+
<info>php %command.full_name% <receiver-name> --blocking-mode</info>
125132
EOF
126133
)
127134
;
@@ -227,6 +234,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
227234
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
228235
$options = [
229236
'sleep' => $input->getOption('sleep') * 1000000,
237+
'blocking-mode' => (bool) $input->getOption('blocking-mode'),
230238
];
231239
if ($queues = $input->getOption('queues')) {
232240
$options['queues'] = $queues;

src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
use Symfony\Component\Messenger\RoutableMessageBus;
2929
use Symfony\Component\Messenger\Stamp\BusNameStamp;
3030
use Symfony\Component\Messenger\Tests\Fixtures\ResettableDummyReceiver;
31+
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
3132
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3233

3334
class ConsumeMessagesCommandTest extends TestCase
@@ -72,6 +73,42 @@ public function testBasicRun()
7273
$this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay());
7374
}
7475

76+
public function testRunWithBlockingModeOption()
77+
{
78+
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
79+
80+
$receiver = $this->createMock(QueueBlockingReceiverInterface::class);
81+
$receiver->expects($this->once())->method('pullFromQueues')->willReturnCallback(function (array $queueNames, callable $callback) use ($envelope) {
82+
\call_user_func($callback, $envelope);
83+
});
84+
85+
$receiverLocator = $this->createMock(ContainerInterface::class);
86+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
87+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
88+
89+
$bus = $this->createMock(MessageBusInterface::class);
90+
$bus->expects($this->once())->method('dispatch');
91+
92+
$busLocator = $this->createMock(ContainerInterface::class);
93+
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
94+
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
95+
96+
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
97+
98+
$application = new Application();
99+
$application->add($command);
100+
$tester = new CommandTester($application->get('messenger:consume'));
101+
$tester->execute([
102+
'receivers' => ['dummy-receiver'],
103+
'--limit' => 1,
104+
'--blocking-mode' => true,
105+
'--queues' => ['foo'],
106+
]);
107+
108+
$tester->assertCommandIsSuccessful();
109+
$this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay());
110+
}
111+
75112
public function testRunWithBusOption()
76113
{
77114
$envelope = new Envelope(new \stdClass());

0 commit comments

Comments
 (0)