Skip to content

Commit 835ba88

Browse files
authored
feat: add support for chained jobs (#54)
* feat: add support for chained jobs * phpcpd * workflow deptrac
1 parent 9e89b19 commit 835ba88

28 files changed

+2027
-88
lines changed

.github/workflows/deptrac.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ on:
2020

2121
jobs:
2222
deptrac:
23-
uses: codeigniter4/.github/.github/workflows/deptrac.yml@main
23+
uses: codeigniter4/.github/.github/workflows/deptrac.yml@CI46

composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
"cs": "php-cs-fixer fix --ansi --verbose --dry-run --diff",
6464
"cs-fix": "php-cs-fixer fix --ansi --verbose --diff",
6565
"style": "@cs-fix",
66-
"deduplicate": "phpcpd app/ src/",
66+
"deduplicate": "phpcpd src/ tests/",
6767
"inspect": "deptrac analyze --cache-file=build/deptrac.cache",
6868
"mutate": "infection --threads=2 --skip-initial-tests --coverage=build/phpunit",
6969
"test": "phpunit"

docs/basic-usage.md

+15
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,21 @@ service('queue')->push('emails', 'email', ['message' => 'Email message goes here
160160

161161
We will be pushing `email` job to the `emails` queue.
162162

163+
### Sending chained jobs to the queue
164+
165+
Sending chained jobs is also simple and lets you specify the particular order of the job execution.
166+
167+
```php
168+
service('queue')->chain(function($chain) {
169+
$chain
170+
->push('reports', 'generate-report', ['userId' => 123])
171+
->push('emails', 'email', ['message' => 'Email message goes here', 'userId' => 123]);
172+
});
173+
```
174+
175+
In the example above, we will send jobs to the `reports` and `emails` queue. First, we will generate a report for given user with the `generate-report` job, after this, we will send an email with `email` job.
176+
The `email` job will be executed only if the `generate-report` job was successful.
177+
163178
### Consuming the queue
164179

165180
Since we sent our sample job to queue `emails`, then we need to run the worker with the appropriate queue:

docs/running-queues.md

+34
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,40 @@ Note that there is no guarantee that the job will run exactly in 5 minutes. If m
7979

8080
We can also combine delayed jobs with priorities.
8181

82+
### Chained jobs
83+
84+
We can create sequences of jobs that run in a specific order. Each job in the chain will be executed after the previous job has completed successfully.
85+
86+
```php
87+
service('queue')->chain(function($chain) {
88+
$chain
89+
->push('reports', 'generate-report', ['userId' => 123])
90+
->setPriority('high') // optional
91+
->push('emails', 'email', ['message' => 'Email message goes here', 'userId' => 123])
92+
->setDelay(30); // optional
93+
});
94+
```
95+
96+
As you may notice, we can use the same options as in regular `push()` - we can set priority and delay, which are optional settings.
97+
98+
#### Important Differences from Regular `push()`
99+
100+
When using the `chain()` method, there are a few important differences compared to the regular `push()` method:
101+
102+
1. **Method Order**: Unlike the regular `push()` method where you set the priority and delay before pushing the job, in a chain you must set these properties after calling `push()` for each job:
103+
104+
```php
105+
// Regular push() - priority set before pushing
106+
service('queue')->setPriority('high')->push('queue', 'job', []);
107+
108+
// Chain push() - priority set after pushing
109+
service('queue')->chain(function($chain) {
110+
$chain->push('queue', 'job', [])->setPriority('high');
111+
});
112+
```
113+
114+
2. **Configuration Scope**: Each configuration (priority, delay) only applies to the job that was just added to the chain.
115+
82116
### Running many instances of the same queue
83117

84118
As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases as long as you keep the `skipLocked` to `true` in the config file. Only for SQLite3 driver, this setting is not relevant as it provides atomicity without the need for explicit concurrency control.

phpstan.neon.dist

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ parameters:
2323
message: '#Call to an undefined method CodeIgniter\\Queue\\Models\\QueueJobFailedModel::truncate\(\).#'
2424
paths:
2525
- src/Handlers/BaseHandler.php
26+
-
27+
message: '#If condition is always true.#'
28+
paths:
29+
- src/Commands/QueueWork.php
2630
universalObjectCratesClasses:
2731
- CodeIgniter\Entity
2832
- CodeIgniter\Entity\Entity

rector.php

+5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
use Rector\EarlyReturn\Rector\Return_\PreparedValueToEarlyReturnRector;
2929
use Rector\Php55\Rector\String_\StringClassNameToClassConstantRector;
3030
use Rector\Php73\Rector\FuncCall\StringifyStrNeedlesRector;
31+
use Rector\Php81\Rector\ClassMethod\NewInInitializerRector;
3132
use Rector\PHPUnit\AnnotationsToAttributes\Rector\Class_\AnnotationWithValueToAttributeRector;
3233
use Rector\PHPUnit\AnnotationsToAttributes\Rector\ClassMethod\DataProviderAnnotationToAttributeRector;
3334
use Rector\PHPUnit\CodeQuality\Rector\Class_\YieldDataProviderRector;
@@ -93,6 +94,10 @@
9394

9495
// Supported from PHPUnit 10
9596
DataProviderAnnotationToAttributeRector::class,
97+
98+
NewInInitializerRector::class => [
99+
'src/Payloads/Payload.php',
100+
],
96101
]);
97102

98103
// auto import fully qualified class names

src/Commands/QueueWork.php

+43
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use CodeIgniter\CLI\CLI;
1818
use CodeIgniter\Queue\Config\Queue as QueueConfig;
1919
use CodeIgniter\Queue\Entities\QueueJob;
20+
use CodeIgniter\Queue\Payloads\PayloadMetadata;
2021
use Exception;
2122
use Throwable;
2223

@@ -247,6 +248,11 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
247248
service('queue')->done($work, $config->keepDoneJobs);
248249

249250
CLI::write('The processing of this job was successful', 'green');
251+
252+
// Check chained jobs
253+
if (isset($payload['metadata']) && $payload['metadata'] !== []) {
254+
$this->processNextJobInChain($payload['metadata']);
255+
}
250256
} catch (Throwable $err) {
251257
if (isset($job) && ++$work->attempts < ($tries ?? $job->getTries())) {
252258
// Schedule for later
@@ -262,6 +268,43 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
262268
}
263269
}
264270

271+
/**
272+
* Process the next job in the chain
273+
*/
274+
private function processNextJobInChain(array $payloadMetadata): void
275+
{
276+
$payloadMetadata = PayloadMetadata::fromArray($payloadMetadata);
277+
278+
if (! $payloadMetadata->hasChainedJobs()) {
279+
return;
280+
}
281+
282+
$nextPayload = $payloadMetadata->getChainedJobs()->shift();
283+
$priority = $nextPayload->getPriority();
284+
$delay = $nextPayload->getDelay();
285+
286+
if ($priority !== null) {
287+
service('queue')->setPriority($priority);
288+
}
289+
290+
if ($delay !== null) {
291+
service('queue')->setDelay($delay);
292+
}
293+
294+
if ($payloadMetadata->hasChainedJobs()) {
295+
$nextPayload->setChainedJobs($payloadMetadata->getChainedJobs());
296+
}
297+
298+
service('queue')->push(
299+
$nextPayload->getQueue(),
300+
$nextPayload->getJob(),
301+
$nextPayload->getData(),
302+
$nextPayload->getMetadata(),
303+
);
304+
305+
CLI::write(sprintf('Chained job: %s has been placed in the queue: %s', $nextPayload->getJob(), $nextPayload->getQueue()), 'green');
306+
}
307+
265308
private function maxJobsCheck(int $maxJobs, int $countJobs): bool
266309
{
267310
if ($maxJobs > 0 && $countJobs >= $maxJobs) {

src/Handlers/BaseHandler.php

+45-34
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,16 @@
1313

1414
namespace CodeIgniter\Queue\Handlers;
1515

16+
use Closure;
1617
use CodeIgniter\I18n\Time;
1718
use CodeIgniter\Queue\Config\Queue as QueueConfig;
1819
use CodeIgniter\Queue\Entities\QueueJob;
1920
use CodeIgniter\Queue\Entities\QueueJobFailed;
2021
use CodeIgniter\Queue\Exceptions\QueueException;
2122
use CodeIgniter\Queue\Models\QueueJobFailedModel;
23+
use CodeIgniter\Queue\Payloads\ChainBuilder;
24+
use CodeIgniter\Queue\Payloads\PayloadMetadata;
25+
use CodeIgniter\Queue\Traits\HasQueueValidation;
2226
use ReflectionException;
2327
use Throwable;
2428

@@ -27,13 +31,15 @@
2731
*/
2832
abstract class BaseHandler
2933
{
34+
use HasQueueValidation;
35+
3036
protected QueueConfig $config;
3137
protected ?string $priority = null;
3238
protected ?int $delay = null;
3339

3440
abstract public function name(): string;
3541

36-
abstract public function push(string $queue, string $job, array $data): bool;
42+
abstract public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): bool;
3743

3844
abstract public function pop(string $queue, array $priorities): ?QueueJob;
3945

@@ -45,38 +51,6 @@ abstract public function done(QueueJob $queueJob, bool $keepJob): bool;
4551

4652
abstract public function clear(?string $queue = null): bool;
4753

48-
/**
49-
* Set priority for job queue.
50-
*/
51-
public function setPriority(string $priority): static
52-
{
53-
if (! preg_match('/^[a-z_-]+$/', $priority)) {
54-
throw QueueException::forIncorrectPriorityFormat();
55-
}
56-
57-
if (strlen($priority) > 64) {
58-
throw QueueException::forTooLongPriorityName();
59-
}
60-
61-
$this->priority = $priority;
62-
63-
return $this;
64-
}
65-
66-
/**
67-
* Set delay for job queue (in seconds).
68-
*/
69-
public function setDelay(int $delay): static
70-
{
71-
if ($delay < 0) {
72-
throw QueueException::forIncorrectDelayValue();
73-
}
74-
75-
$this->delay = $delay;
76-
77-
return $this;
78-
}
79-
8054
/**
8155
* Retry failed job.
8256
*
@@ -104,7 +78,7 @@ public function retry(?int $id, ?string $queue): int
10478
}
10579

10680
/**
107-
* Delete failed job by ID.
81+
* Delete a failed job by ID.
10882
*/
10983
public function forget(int $id): bool
11084
{
@@ -150,6 +124,43 @@ public function listFailed(?string $queue): array
150124
->findAll();
151125
}
152126

127+
/**
128+
* Set delay for job queue (in seconds).
129+
*/
130+
public function setDelay(int $delay): static
131+
{
132+
$this->validateDelay($delay);
133+
134+
$this->delay = $delay;
135+
136+
return $this;
137+
}
138+
139+
/**
140+
* Set priority for job queue.
141+
*/
142+
public function setPriority(string $priority): static
143+
{
144+
$this->validatePriority($priority);
145+
146+
$this->priority = $priority;
147+
148+
return $this;
149+
}
150+
151+
/**
152+
* Create a job chain on the specified queue
153+
*
154+
* @param Closure $callback Chain definition callback
155+
*/
156+
public function chain(Closure $callback): bool
157+
{
158+
$chainBuilder = new ChainBuilder($this);
159+
$callback($chainBuilder);
160+
161+
return $chainBuilder->dispatch();
162+
}
163+
153164
/**
154165
* Log failed job.
155166
*

src/Handlers/DatabaseHandler.php

+4-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
use CodeIgniter\Queue\Enums\Status;
2020
use CodeIgniter\Queue\Interfaces\QueueInterface;
2121
use CodeIgniter\Queue\Models\QueueJobModel;
22-
use CodeIgniter\Queue\Payload;
22+
use CodeIgniter\Queue\Payloads\Payload;
23+
use CodeIgniter\Queue\Payloads\PayloadMetadata;
2324
use ReflectionException;
2425
use Throwable;
2526

@@ -46,13 +47,13 @@ public function name(): string
4647
*
4748
* @throws ReflectionException
4849
*/
49-
public function push(string $queue, string $job, array $data): bool
50+
public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): bool
5051
{
5152
$this->validateJobAndPriority($queue, $job);
5253

5354
$queueJob = new QueueJob([
5455
'queue' => $queue,
55-
'payload' => new Payload($job, $data),
56+
'payload' => new Payload($job, $data, $metadata),
5657
'priority' => $this->priority,
5758
'status' => Status::PENDING->value,
5859
'attempts' => 0,

src/Handlers/PredisHandler.php

+4-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
use CodeIgniter\Queue\Entities\QueueJob;
2121
use CodeIgniter\Queue\Enums\Status;
2222
use CodeIgniter\Queue\Interfaces\QueueInterface;
23-
use CodeIgniter\Queue\Payload;
23+
use CodeIgniter\Queue\Payloads\Payload;
24+
use CodeIgniter\Queue\Payloads\PayloadMetadata;
2425
use Exception;
2526
use Predis\Client;
2627
use Throwable;
@@ -58,7 +59,7 @@ public function name(): string
5859
/**
5960
* Add job to the queue.
6061
*/
61-
public function push(string $queue, string $job, array $data): bool
62+
public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): bool
6263
{
6364
$this->validateJobAndPriority($queue, $job);
6465

@@ -69,7 +70,7 @@ public function push(string $queue, string $job, array $data): bool
6970
$queueJob = new QueueJob([
7071
'id' => random_string('numeric', 16),
7172
'queue' => $queue,
72-
'payload' => new Payload($job, $data),
73+
'payload' => new Payload($job, $data, $metadata),
7374
'priority' => $this->priority,
7475
'status' => Status::PENDING->value,
7576
'attempts' => 0,

src/Handlers/RedisHandler.php

+4-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
use CodeIgniter\Queue\Entities\QueueJob;
2121
use CodeIgniter\Queue\Enums\Status;
2222
use CodeIgniter\Queue\Interfaces\QueueInterface;
23-
use CodeIgniter\Queue\Payload;
23+
use CodeIgniter\Queue\Payloads\Payload;
24+
use CodeIgniter\Queue\Payloads\PayloadMetadata;
2425
use Redis;
2526
use RedisException;
2627
use Throwable;
@@ -75,7 +76,7 @@ public function name(): string
7576
*
7677
* @throws RedisException
7778
*/
78-
public function push(string $queue, string $job, array $data): bool
79+
public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): bool
7980
{
8081
$this->validateJobAndPriority($queue, $job);
8182

@@ -86,7 +87,7 @@ public function push(string $queue, string $job, array $data): bool
8687
$queueJob = new QueueJob([
8788
'id' => random_string('numeric', 16),
8889
'queue' => $queue,
89-
'payload' => new Payload($job, $data),
90+
'payload' => new Payload($job, $data, $metadata),
9091
'priority' => $this->priority,
9192
'status' => Status::PENDING->value,
9293
'attempts' => 0,

0 commit comments

Comments
 (0)