Skip to content

Commit 376ce2a

Browse files
committed
add an option to delay job execution when adding to the queue
1 parent 16fd35b commit 376ce2a

10 files changed

+223
-9
lines changed

docs/running-queues.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ This will cause command to check for the new jobs every 10 seconds if the queue
1212

1313
### With CRON
1414

15-
Using queues with CRON is more challenging, but definitely doable. You can use command like this:
15+
Using queues with CRON is more challenging but definitely doable. You can use command like this:
1616

1717
php spark queue:work emails -max-jobs 20 --stop-when-empty
1818

@@ -63,6 +63,22 @@ But we can also run the worker like this:
6363

6464
This way, worker will consume jobs with the `low` priority and then with `high`. The order set in the config file is override.
6565

66+
### Delaying jobs
67+
68+
Normally, when we add jobs to a queue, they are run in the order in which we added them to the queue (FIFO - first in, first out).
69+
Of course, there are also priorities, which we described in the previous section. But what about the scenario where we want to run a job, but not earlier than in 5 minutes?
70+
71+
This is where job delay comes into play. We measure the delay in seconds.
72+
73+
```php
74+
// This job will be run not sooner than in 5 minutes
75+
service('queue')->setDelay(5 * MINUTE)->push('emails', 'email', ['message' => 'Email sent no sooner than 5 minutes from now']);
76+
```
77+
78+
Note that there is no guarantee that the job will run exactly in 5 minutes. If many new jobs are added to the queue (without a delay), it may take a long time before the delayed job is actually executed.
79+
80+
We can also combine delayed jobs with priorities.
81+
6682
### Running many instances of the same queue
6783

6884
As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases as long as you keep the `skipLocked` to `true` in the config file. Only for SQLite3 driver, this setting is not relevant as it provides atomicity without the need for explicit concurrency control.

src/Exceptions/QueueException.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,9 @@ public static function forIncorrectQueuePriority(string $priority, string $queue
5151
{
5252
return new self(lang('Queue.incorrectQueuePriority', [$priority, $queue]));
5353
}
54+
55+
public static function forIncorrectDelayValue(): static
56+
{
57+
return new self(lang('Queue.incorrectDelayValue'));
58+
}
5459
}

src/Handlers/BaseHandler.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ abstract class BaseHandler
2929
{
3030
protected QueueConfig $config;
3131
protected ?string $priority = null;
32+
protected ?int $delay = null;
3233

3334
abstract public function name(): string;
3435

@@ -62,6 +63,20 @@ public function setPriority(string $priority): static
6263
return $this;
6364
}
6465

66+
/**
67+
* Set delay for job queue (in seconds).
68+
*/
69+
public function setDelay(int $delay): static
70+
{
71+
if ($delay < 0) {
72+
throw QueueException::forIncorrectDelayValue();
73+
}
74+
75+
$this->delay = $delay;
76+
77+
return $this;
78+
}
79+
6580
/**
6681
* Retry failed job.
6782
*

src/Handlers/DatabaseHandler.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ public function push(string $queue, string $job, array $data): bool
5656
'priority' => $this->priority,
5757
'status' => Status::PENDING->value,
5858
'attempts' => 0,
59-
'available_at' => Time::now(),
59+
'available_at' => Time::now()->addSeconds($this->delay ?? 0),
6060
]);
6161

62-
$this->priority = null;
62+
$this->priority = $this->delay = null;
6363

6464
return $this->jobModel->insert($queueJob, false);
6565
}

src/Handlers/PredisHandler.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,21 @@ public function push(string $queue, string $job, array $data): bool
6464

6565
helper('text');
6666

67+
$availableAt = Time::now()->addSeconds($this->delay ?? 0);
68+
6769
$queueJob = new QueueJob([
6870
'id' => random_string('numeric', 16),
6971
'queue' => $queue,
7072
'payload' => new Payload($job, $data),
7173
'priority' => $this->priority,
7274
'status' => Status::PENDING->value,
7375
'attempts' => 0,
74-
'available_at' => Time::now(),
76+
'available_at' => $availableAt,
7577
]);
7678

77-
$result = $this->predis->zadd("queues:{$queue}:{$this->priority}", [json_encode($queueJob) => Time::now()->timestamp]);
79+
$result = $this->predis->zadd("queues:{$queue}:{$this->priority}", [json_encode($queueJob) => $availableAt->timestamp]);
7880

79-
$this->priority = null;
81+
$this->priority = $this->delay = null;
8082

8183
return $result > 0;
8284
}

src/Handlers/RedisHandler.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,21 @@ public function push(string $queue, string $job, array $data): bool
8181

8282
helper('text');
8383

84+
$availableAt = Time::now()->addSeconds($this->delay ?? 0);
85+
8486
$queueJob = new QueueJob([
8587
'id' => random_string('numeric', 16),
8688
'queue' => $queue,
8789
'payload' => new Payload($job, $data),
8890
'priority' => $this->priority,
8991
'status' => Status::PENDING->value,
9092
'attempts' => 0,
91-
'available_at' => Time::now(),
93+
'available_at' => $availableAt,
9294
]);
9395

94-
$result = (int) $this->redis->zAdd("queues:{$queue}:{$this->priority}", Time::now()->timestamp, json_encode($queueJob));
96+
$result = (int) $this->redis->zAdd("queues:{$queue}:{$this->priority}", $availableAt->timestamp, json_encode($queueJob));
9597

96-
$this->priority = null;
98+
$this->priority = $this->delay = null;
9799

98100
return $result > 0;
99101
}

src/Language/en/Queue.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@
2424
'incorrectPriorityFormat' => 'The priority name should consists only lowercase letters.',
2525
'tooLongPriorityName' => 'The priority name is too long. It should be no longer than 64 letters.',
2626
'incorrectQueuePriority' => 'This queue has incorrectly defined priority: "{0}" for the queue: "{1}".',
27+
'incorrectDelayValue' => 'The number of seconds of delay must be a positive integer.',
2728
];

tests/DatabaseHandlerTest.php

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ public function testPushWithPriority(): void
112112
]);
113113
}
114114

115+
/**
116+
* @throws ReflectionException
117+
*/
115118
public function testPushAndPopWithPriority(): void
116119
{
117120
Time::setTestNow('2023-12-29 14:15:16');
@@ -148,6 +151,67 @@ public function testPushAndPopWithPriority(): void
148151
$this->assertSame($payload, $result->payload);
149152
}
150153

154+
/**
155+
* @throws ReflectionException
156+
*/
157+
public function testPushWithDelay(): void
158+
{
159+
Time::setTestNow('2023-12-29 14:15:16');
160+
161+
$handler = new DatabaseHandler($this->config);
162+
$result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key' => 'value']);
163+
164+
$this->assertTrue($result);
165+
$this->seeInDatabase('queue_jobs', [
166+
'queue' => 'queue-delay',
167+
'payload' => json_encode(['job' => 'success', 'data' => ['key' => 'value']]),
168+
'available_at' => 1703859376,
169+
]);
170+
}
171+
172+
/**
173+
* @throws ReflectionException
174+
*/
175+
public function testPushAndPopWithDelay(): void
176+
{
177+
Time::setTestNow('2023-12-29 14:15:16');
178+
179+
$handler = new DatabaseHandler($this->config);
180+
$result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key1' => 'value1']);
181+
182+
$this->assertTrue($result);
183+
$this->seeInDatabase('queue_jobs', [
184+
'queue' => 'queue-delay',
185+
'payload' => json_encode(['job' => 'success', 'data' => ['key1' => 'value1']]),
186+
'available_at' => 1703859376,
187+
]);
188+
189+
$result = $handler->push('queue-delay', 'success', ['key2' => 'value2']);
190+
191+
$this->assertTrue($result);
192+
$this->seeInDatabase('queue_jobs', [
193+
'queue' => 'queue-delay',
194+
'payload' => json_encode(['job' => 'success', 'data' => ['key2' => 'value2']]),
195+
'available_at' => 1703859316,
196+
]);
197+
198+
$result = $handler->pop('queue-delay', ['default']);
199+
$this->assertInstanceOf(QueueJob::class, $result);
200+
$payload = ['job' => 'success', 'data' => ['key2' => 'value2']];
201+
$this->assertSame($payload, $result->payload);
202+
203+
$result = $handler->pop('queue-delay', ['default']);
204+
$this->assertNull($result);
205+
206+
// add 1 minute
207+
Time::setTestNow('2023-12-29 14:16:16');
208+
209+
$result = $handler->pop('queue-delay', ['default']);
210+
$this->assertInstanceOf(QueueJob::class, $result);
211+
$payload = ['job' => 'success', 'data' => ['key1' => 'value1']];
212+
$this->assertSame($payload, $result->payload);
213+
}
214+
151215
/**
152216
* @throws ReflectionException
153217
*/

tests/PredisHandlerTest.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,60 @@ public function testPushWithPriority(): void
102102
$this->assertSame(['key' => 'value'], $queueJob->payload['data']);
103103
}
104104

105+
/**
106+
* @throws ReflectionException
107+
*/
108+
public function testPushWithDelay(): void
109+
{
110+
Time::setTestNow('2023-12-29 14:15:16');
111+
112+
$handler = new PredisHandler($this->config);
113+
$result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key' => 'value']);
114+
115+
$this->assertTrue($result);
116+
117+
$predis = self::getPrivateProperty($handler, 'predis');
118+
$this->assertSame(1, $predis->zcard('queues:queue-delay:default'));
119+
120+
$task = $predis->zrangebyscore('queues:queue-delay:default', '-inf', Time::now()->addSeconds(MINUTE)->timestamp, ['limit' => [0, 1]]);
121+
$queueJob = new QueueJob(json_decode((string) $task[0], true));
122+
$this->assertSame('success', $queueJob->payload['job']);
123+
$this->assertSame(['key' => 'value'], $queueJob->payload['data']);
124+
}
125+
126+
/**
127+
* @throws ReflectionException
128+
*/
129+
public function testPushAndPopWithDelay(): void
130+
{
131+
Time::setTestNow('2023-12-29 14:15:16');
132+
133+
$handler = new PredisHandler($this->config);
134+
$result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key1' => 'value1']);
135+
136+
$this->assertTrue($result);
137+
138+
$result = $handler->push('queue-delay', 'success', ['key2' => 'value2']);
139+
140+
$this->assertTrue($result);
141+
142+
$result = $handler->pop('queue-delay', ['default']);
143+
$this->assertInstanceOf(QueueJob::class, $result);
144+
$payload = ['job' => 'success', 'data' => ['key2' => 'value2']];
145+
$this->assertSame($payload, $result->payload);
146+
147+
$result = $handler->pop('queue-delay', ['default']);
148+
$this->assertNull($result);
149+
150+
// add 1 minute
151+
Time::setTestNow('2023-12-29 14:16:16');
152+
153+
$result = $handler->pop('queue-delay', ['default']);
154+
$this->assertInstanceOf(QueueJob::class, $result);
155+
$payload = ['job' => 'success', 'data' => ['key1' => 'value1']];
156+
$this->assertSame($payload, $result->payload);
157+
}
158+
105159
public function testPushException(): void
106160
{
107161
$this->expectException(QueueException::class);

tests/RedisHandlerTest.php

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use CodeIgniter\Queue\Handlers\RedisHandler;
2020
use CodeIgniter\Test\ReflectionHelper;
2121
use Exception;
22+
use ReflectionException;
2223
use Tests\Support\Config\Queue as QueueConfig;
2324
use Tests\Support\Database\Seeds\TestRedisQueueSeeder;
2425
use Tests\Support\TestCase;
@@ -95,6 +96,60 @@ public function testPushWithPriority(): void
9596
$this->assertSame(['key' => 'value'], $queueJob->payload['data']);
9697
}
9798

99+
/**
100+
* @throws ReflectionException
101+
*/
102+
public function testPushWithDelay(): void
103+
{
104+
Time::setTestNow('2023-12-29 14:15:16');
105+
106+
$handler = new RedisHandler($this->config);
107+
$result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key' => 'value']);
108+
109+
$this->assertTrue($result);
110+
111+
$redis = self::getPrivateProperty($handler, 'redis');
112+
$this->assertSame(1, $redis->zCard('queues:queue-delay:default'));
113+
114+
$task = $redis->zRangeByScore('queues:queue-delay:default', '-inf', Time::now()->addSeconds(MINUTE)->timestamp, ['limit' => [0, 1]]);
115+
$queueJob = new QueueJob(json_decode((string) $task[0], true));
116+
$this->assertSame('success', $queueJob->payload['job']);
117+
$this->assertSame(['key' => 'value'], $queueJob->payload['data']);
118+
}
119+
120+
/**
121+
* @throws ReflectionException
122+
*/
123+
public function testPushAndPopWithDelay(): void
124+
{
125+
Time::setTestNow('2023-12-29 14:15:16');
126+
127+
$handler = new RedisHandler($this->config);
128+
$result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key1' => 'value1']);
129+
130+
$this->assertTrue($result);
131+
132+
$result = $handler->push('queue-delay', 'success', ['key2' => 'value2']);
133+
134+
$this->assertTrue($result);
135+
136+
$result = $handler->pop('queue-delay', ['default']);
137+
$this->assertInstanceOf(QueueJob::class, $result);
138+
$payload = ['job' => 'success', 'data' => ['key2' => 'value2']];
139+
$this->assertSame($payload, $result->payload);
140+
141+
$result = $handler->pop('queue-delay', ['default']);
142+
$this->assertNull($result);
143+
144+
// add 1 minute
145+
Time::setTestNow('2023-12-29 14:16:16');
146+
147+
$result = $handler->pop('queue-delay', ['default']);
148+
$this->assertInstanceOf(QueueJob::class, $result);
149+
$payload = ['job' => 'success', 'data' => ['key1' => 'value1']];
150+
$this->assertSame($payload, $result->payload);
151+
}
152+
98153
public function testPushException(): void
99154
{
100155
$this->expectException(QueueException::class);

0 commit comments

Comments
 (0)