Skip to content

Commit 4b2a727

Browse files
authored
Add messenger transport routing per job name (#175)
* Add messenger transport routing per job name * Fixed missing sentence end in doc * Fixed unclosed prototypes in config * Fixed potentially undefined messenger config * Fixed validate rules not set on prototyped config vars
1 parent 536f28c commit 4b2a727

File tree

7 files changed

+124
-9
lines changed

7 files changed

+124
-9
lines changed

docs/docs/bridges/symfony-messenger.rst

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ message will be dispatched and handled by the
1919
`LaunchJobMessageHandler <https://github.com/yokai-php/batch-symfony-messenger/blob/0.x/src/LaunchJobMessageHandler.php>`__
2020
will be called with that message after being routed.
2121

22+
2223
How to configure an async transport for the launcher?
2324
------------------------------------------------------------
2425

@@ -46,6 +47,31 @@ You will end with something like:
4647
| :doc:`Bridge with Symfony Framework </bridges/symfony-framework>`
4748
4849

50+
How to configure different transport for your jobs?
51+
------------------------------------------------------------
52+
53+
On some projects, you will end with different messenger transports,
54+
and will not want to run all jobs on the same transport.
55+
56+
| Because we are using the same message class for all jobs, there is no way to configure this in Symfony.
57+
| Instead, you will have to configure a job name to transport routing, in our side of the configuration.
58+
| It is very likely to the messenger routing configuration, but you will use the job name instead of the message class.
59+
60+
.. code-block:: yaml
61+
62+
# config/packages/yokai_batch.yaml
63+
yokai_batch:
64+
launchers:
65+
messenger:
66+
routing:
67+
export_job_name: async_with_low_priority
68+
import_job_name: async_with_high_priority
69+
70+
.. seealso::
71+
| :doc:`What is a job launcher? </core-concepts/job-launcher>`
72+
| :doc:`Getting started with Symfony Framework </getting-started/with-symfony>`
73+
74+
4975
Dispatch item with messenger writer
5076
------------------------------------------------------------
5177

src/batch-symfony-framework/src/DependencyInjection/Configuration.php

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
* @phpstan-type LauncherConfig array{
3434
* default: string|null,
3535
* launchers: array<string, string>,
36+
* messenger?: array{
37+
* routing: array<string, string>,
38+
* },
3639
* }
3740
* @phpstan-type ParametersConfig array{
3841
* global: array<string, mixed>,
@@ -130,8 +133,18 @@ private function launcher(): ArrayNodeDefinition
130133
->defaultValue(['simple' => 'simple://simple'])
131134
->useAttributeAsKey('name')
132135
->scalarPrototype()
133-
->validate()
134-
->ifTrue($isInvalidDsn)->thenInvalid('Invalid job launcher DSN.')
136+
->validate()
137+
->ifTrue($isInvalidDsn)->thenInvalid('Invalid job launcher DSN.')
138+
->end()
139+
->end()
140+
->end()
141+
->arrayNode('messenger')
142+
->children()
143+
->arrayNode('routing')
144+
->normalizeKeys(false)
145+
->useAttributeAsKey('name')
146+
->scalarPrototype()->end()
147+
->end()
135148
->end()
136149
->end()
137150
->end()
@@ -164,15 +177,15 @@ private function parameters(): ArrayNodeDefinition
164177
->children()
165178
->arrayNode('global')
166179
->useAttributeAsKey('name')
167-
->variablePrototype()
168-
->end()
180+
->variablePrototype()->end()
169181
->end()
170182
->arrayNode('per_job')
171183
->useAttributeAsKey('name')
172184
->variablePrototype()
173-
->validate()
174-
->ifTrue(fn(mixed $value) => !$isStringAssociativeArray($value))
175-
->thenInvalid('Should be an array<string, mixed>.')
185+
->validate()
186+
->ifTrue(fn(mixed $value) => !$isStringAssociativeArray($value))
187+
->thenInvalid('Should be an array<string, mixed>.')
188+
->end()
176189
->end()
177190
->end()
178191
->end()

src/batch-symfony-framework/src/DependencyInjection/JobLauncherDefinitionFactory.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Yokai\Batch\Bridge\Symfony\Console\CommandRunner;
1212
use Yokai\Batch\Bridge\Symfony\Console\RunCommandJobLauncher;
1313
use Yokai\Batch\Bridge\Symfony\Messenger\DispatchMessageJobLauncher;
14+
use Yokai\Batch\Bridge\Symfony\Messenger\MessengerJobsConfiguration;
1415
use Yokai\Batch\Launcher\JobLauncherInterface;
1516
use Yokai\Batch\Launcher\SimpleJobLauncher;
1617
use Yokai\Batch\Storage\JobExecutionStorageInterface;
@@ -71,6 +72,9 @@ private static function messenger(): Definition
7172
'$jobExecutionFactory' => new Reference('yokai_batch.job_execution_factory'),
7273
'$jobExecutionStorage' => new Reference(JobExecutionStorageInterface::class),
7374
'$messageBus' => new Reference(MessageBusInterface::class),
75+
'$messengerJobsConfiguration' => new Definition(MessengerJobsConfiguration::class, [
76+
'$routing' => '%yokai_batch.launcher.messenger_routing%',
77+
]),
7478
]);
7579
}
7680

src/batch-symfony-framework/src/DependencyInjection/YokaiBatchExtension.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ private function configureLauncher(ContainerBuilder $container, array $config):
144144
));
145145
}
146146

147+
$container->setParameter('yokai_batch.launcher.messenger_routing', $config['messenger']['routing'] ?? []);
148+
147149
$launcherIdPerLauncherName = [];
148150
foreach ($config['launchers'] as $name => $dsn) {
149151
$definitionOrReference = JobLauncherDefinitionFactory::fromDsn($dsn);

src/batch-symfony-messenger/src/DispatchMessageJobLauncher.php

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
namespace Yokai\Batch\Bridge\Symfony\Messenger;
66

7+
use Symfony\Component\Messenger\Envelope;
78
use Symfony\Component\Messenger\Exception\ExceptionInterface;
89
use Symfony\Component\Messenger\MessageBusInterface;
10+
use Symfony\Component\Messenger\Stamp\TransportNamesStamp;
911
use Yokai\Batch\BatchStatus;
1012
use Yokai\Batch\Factory\JobExecutionFactory;
1113
use Yokai\Batch\JobExecution;
@@ -21,6 +23,7 @@ public function __construct(
2123
private JobExecutionFactory $jobExecutionFactory,
2224
private JobExecutionStorageInterface $jobExecutionStorage,
2325
private MessageBusInterface $messageBus,
26+
private MessengerJobsConfiguration $messengerJobsConfiguration,
2427
) {
2528
}
2629

@@ -33,9 +36,17 @@ public function launch(string $name, array $configuration = []): JobExecution
3336
$jobExecution->setStatus(BatchStatus::PENDING);
3437
$this->jobExecutionStorage->store($jobExecution);
3538

39+
$message = new LaunchJobMessage($name, $configuration);
40+
41+
// if it was configured a specific transport name for this job, add a stamp to force it
42+
$transportName = $this->messengerJobsConfiguration->getTransportNameForJobName($name);
43+
if ($transportName !== null) {
44+
$message = (new Envelope($message))
45+
->with(new TransportNamesStamp($transportName));
46+
}
47+
3648
try {
37-
// dispatch message
38-
$this->messageBus->dispatch(new LaunchJobMessage($name, $configuration));
49+
$this->messageBus->dispatch($message);
3950
} catch (ExceptionInterface $exception) {
4051
// if a messenger exception occurs, it will be converted to job failure
4152
$jobExecution->setStatus(BatchStatus::FAILED);
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Yokai\Batch\Bridge\Symfony\Messenger;
6+
7+
/**
8+
* Holds the Symfony messenger configuration.
9+
*/
10+
final class MessengerJobsConfiguration
11+
{
12+
public function __construct(
13+
/** @var array<string, string> */
14+
private array $routing,
15+
) {
16+
}
17+
18+
/**
19+
* Get the configured transport name for a job name.
20+
* Return null if none was provided.
21+
*/
22+
public function getTransportNameForJobName(string $jobName): string|null
23+
{
24+
return $this->routing[$jobName] ?? null;
25+
}
26+
}

src/batch-symfony-messenger/tests/DispatchMessageJobLauncherTest.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
namespace Yokai\Batch\Tests\Bridge\Symfony\Messenger;
66

77
use PHPUnit\Framework\TestCase;
8+
use Symfony\Component\Messenger\Envelope;
89
use Symfony\Component\Messenger\Exception\TransportException;
10+
use Symfony\Component\Messenger\Stamp\TransportNamesStamp;
911
use Yokai\Batch\BatchStatus;
1012
use Yokai\Batch\Bridge\Symfony\Messenger\DispatchMessageJobLauncher;
1113
use Yokai\Batch\Bridge\Symfony\Messenger\LaunchJobMessage;
14+
use Yokai\Batch\Bridge\Symfony\Messenger\MessengerJobsConfiguration;
1215
use Yokai\Batch\Factory\JobExecutionFactory;
1316
use Yokai\Batch\Factory\JobExecutionParametersBuilder\NullJobExecutionParametersBuilder;
1417
use Yokai\Batch\Factory\UniqidJobExecutionIdGenerator;
@@ -25,6 +28,7 @@ public function testLaunch(): void
2528
new JobExecutionFactory(new UniqidJobExecutionIdGenerator(), new NullJobExecutionParametersBuilder()),
2629
$storage = new InMemoryJobExecutionStorage(),
2730
$messageBus = new BufferingMessageBus(),
31+
new MessengerJobsConfiguration([]),
2832
);
2933

3034
$jobExecutionFromLauncher = $jobLauncher->launch('testing', ['_id' => '123456789', 'foo' => ['bar']]);
@@ -48,6 +52,7 @@ public function testLaunchWithNoId(): void
4852
),
4953
$storage = new InMemoryJobExecutionStorage(),
5054
$messageBus = new BufferingMessageBus(),
55+
new MessengerJobsConfiguration([]),
5156
);
5257

5358
$jobExecutionFromLauncher = $jobLauncher->launch('testing');
@@ -67,6 +72,7 @@ public function testLaunchAndMessengerFail(): void
6772
new JobExecutionFactory(new UniqidJobExecutionIdGenerator(), new NullJobExecutionParametersBuilder()),
6873
$storage = new InMemoryJobExecutionStorage(),
6974
new FailingMessageBus(new TransportException('This is a test')),
75+
new MessengerJobsConfiguration([]),
7076
);
7177

7278
$jobExecutionFromLauncher = $jobLauncher->launch('testing');
@@ -82,6 +88,33 @@ public function testLaunchAndMessengerFail(): void
8288
self::assertSame('This is a test', $failure->getMessage());
8389
}
8490

91+
public function testLaunchWithRouting(): void
92+
{
93+
$jobLauncher = new DispatchMessageJobLauncher(
94+
new JobExecutionFactory(new UniqidJobExecutionIdGenerator(), new NullJobExecutionParametersBuilder()),
95+
$storage = new InMemoryJobExecutionStorage(),
96+
$messageBus = new BufferingMessageBus(),
97+
new MessengerJobsConfiguration(['testing' => 'custom_transport', 'unused' => 'unused_transport']),
98+
);
99+
100+
$jobExecutionFromLauncher = $jobLauncher->launch('testing', ['_id' => '123456789', 'foo' => ['bar']]);
101+
102+
[$jobExecutionFromStorage] = $storage->getExecutions();
103+
self::assertSame($jobExecutionFromLauncher, $jobExecutionFromStorage);
104+
105+
$messages = $messageBus->getMessages();
106+
self::assertCount(1, $messages);
107+
$message = $messages[0];
108+
self::assertInstanceOf(Envelope::class, $message);
109+
$stamp = $message->all(TransportNamesStamp::class)[0];
110+
self::assertInstanceOf(TransportNamesStamp::class, $stamp);
111+
self::assertSame(['custom_transport'], $stamp->getTransportNames());
112+
$message = $message->getMessage();
113+
self::assertInstanceOf(LaunchJobMessage::class, $message);
114+
self::assertSame('testing', $message->getJobName());
115+
self::assertSame(['_id' => '123456789', 'foo' => ['bar']], $message->getConfiguration());
116+
}
117+
85118
private static function assertJobWasTriggered(BufferingMessageBus $bus, string $jobName, array $config): void
86119
{
87120
$messages = $bus->getMessages();

0 commit comments

Comments
 (0)