Skip to content

Commit d5bcbc6

Browse files
committed
feat: switch to db queue as default
1 parent 08a599d commit d5bcbc6

File tree

7 files changed

+161
-6
lines changed

7 files changed

+161
-6
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
3+
use Flarum\Database\Migration;
4+
use Illuminate\Database\Schema\Blueprint;
5+
6+
return Migration::createTableIfNotExists(
7+
'queue_jobs',
8+
function (Blueprint $table) {
9+
$table->bigIncrements('id');
10+
$table->string('queue')->index();
11+
$table->longText('payload');
12+
$table->unsignedTinyInteger('attempts');
13+
$table->unsignedInteger('reserved_at')->nullable();
14+
$table->unsignedInteger('available_at');
15+
$table->unsignedInteger('created_at');
16+
}
17+
);
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
3+
use Flarum\Database\Migration;
4+
use Illuminate\Database\Schema\Blueprint;
5+
6+
return Migration::createTableIfNotExists(
7+
'queue_failed_jobs',
8+
function (Blueprint $table) {
9+
$table->id();
10+
$table->string('uuid')->unique();
11+
$table->text('connection')->nullable();
12+
$table->text('queue');
13+
$table->longText('payload');
14+
$table->longText('exception');
15+
$table->timestamp('failed_at')->useCurrent();
16+
}
17+
);

framework/core/src/Console/ConsoleServiceProvider.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
use Flarum\Foundation\Console\AssetsPublishCommand;
2020
use Flarum\Foundation\Console\CacheClearCommand;
2121
use Flarum\Foundation\Console\InfoCommand;
22+
use Flarum\Foundation\ContainerUtil;
23+
use Flarum\Queue\Console\DatabaseWorkerArgs;
2224
use Flarum\Settings\SettingsRepositoryInterface;
2325
use Illuminate\Console\Events\CommandFinished;
2426
use Illuminate\Console\Scheduling\CacheEventMutex;
@@ -74,8 +76,8 @@ public function register(): void
7476
];
7577
});
7678

77-
$this->container->singleton('flarum.console.scheduled', function () {
78-
return [];
79+
$this->container->singleton('flarum.console.scheduled', function (Container $container) {
80+
return [array_merge($container->make('flarum.queue.schedule', []))];
7981
});
8082
}
8183

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
namespace Flarum\Queue\Console;
4+
5+
use Flarum\Settings\SettingsRepositoryInterface;
6+
7+
class DatabaseWorkerArgs
8+
{
9+
public function __construct(protected SettingsRepositoryInterface $settings)
10+
{
11+
}
12+
13+
public function args(): array
14+
{
15+
$args = [
16+
'--stop-when-empty',
17+
];
18+
19+
if ($retries = $this->settings->get('database-queue.retries')) {
20+
$args['--tries'] = $retries;
21+
}
22+
23+
if ($memory = $this->settings->get('database-queue.memory')) {
24+
$args['--memory'] = $memory;
25+
}
26+
27+
if ($timeout = $this->settings->get('database-queue.timeout')) {
28+
$args['--timeout'] = $timeout;
29+
}
30+
31+
if ($rest = $this->settings->get('database-queue.rest')) {
32+
$args['--rest'] = $rest;
33+
}
34+
35+
if ($backoff = $this->settings->get('database-queue.backoff')) {
36+
$args['--backoff'] = $backoff;
37+
}
38+
39+
return $args;
40+
}
41+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
namespace Flarum\Queue\Console;
4+
5+
use Carbon\Carbon;
6+
use Flarum\Settings\SettingsRepositoryInterface;
7+
use Illuminate\Contracts\Cache\Repository as Cache;
8+
use Illuminate\Queue\Worker;
9+
10+
class WorkCommand extends \Illuminate\Queue\Console\WorkCommand
11+
{
12+
public function __construct(Worker $worker, Cache $cache, protected SettingsRepositoryInterface $settings)
13+
{
14+
parent::__construct($worker, $cache);
15+
}
16+
17+
public function handle()
18+
{
19+
$this->settings->set('database_queue.working', Carbon::now()->toIso8601String());
20+
21+
try {
22+
parent::handle();
23+
} catch (\Exception $e) {
24+
$this->settings->delete('database_queue.working');
25+
26+
throw $e;
27+
}
28+
}
29+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
namespace Flarum\Queue;
4+
5+
use Illuminate\Database\ConnectionInterface;
6+
use Illuminate\Database\ConnectionResolverInterface;
7+
8+
class DatabaseUuidFailedJobProvider extends \Illuminate\Queue\Failed\DatabaseUuidFailedJobProvider
9+
{
10+
public function __construct(ConnectionResolverInterface $resolver, $database, $table, protected ConnectionInterface $connection)
11+
{
12+
parent::__construct($resolver, $database, $table);
13+
}
14+
15+
/**
16+
* Get a new query builder instance for the table.
17+
*
18+
* @return \Illuminate\Database\Query\Builder
19+
*/
20+
protected function getTable()
21+
{
22+
return $this->connection->table($this->table);
23+
}
24+
}

framework/core/src/Queue/QueueServiceProvider.php

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111

1212
use Flarum\Foundation\AbstractServiceProvider;
1313
use Flarum\Foundation\Config;
14+
use Flarum\Foundation\ContainerUtil;
1415
use Flarum\Foundation\ErrorHandling\Registry;
1516
use Flarum\Foundation\ErrorHandling\Reporter;
1617
use Flarum\Foundation\Paths;
18+
use Flarum\Queue\Console\DatabaseWorkerArgs;
1719
use Illuminate\Container\Container as ContainerImplementation;
1820
use Illuminate\Contracts\Cache\Factory as CacheFactory;
1921
use Illuminate\Contracts\Cache\Repository;
@@ -24,8 +26,8 @@
2426
use Illuminate\Contracts\Queue\Queue;
2527
use Illuminate\Queue\Connectors\ConnectorInterface;
2628
use Illuminate\Queue\Console as Commands;
29+
use Illuminate\Queue\DatabaseQueue;
2730
use Illuminate\Queue\Events\JobFailed;
28-
use Illuminate\Queue\Failed\NullFailedJobProvider;
2931
use Illuminate\Queue\Listener as QueueListener;
3032
use Illuminate\Queue\SyncQueue;
3133
use Illuminate\Queue\Worker;
@@ -39,7 +41,7 @@ class QueueServiceProvider extends AbstractServiceProvider
3941
Commands\ListFailedCommand::class,
4042
Commands\RestartCommand::class,
4143
Commands\RetryCommand::class,
42-
Commands\WorkCommand::class,
44+
Console\WorkCommand::class,
4345
];
4446

4547
public function register(): void
@@ -55,7 +57,10 @@ public function register(): void
5557
// Extensions can override this binding if they want to make Flarum use
5658
// a different queuing backend.
5759
$this->container->singleton('flarum.queue.connection', function (ContainerImplementation $container) {
58-
$queue = new SyncQueue;
60+
$queue = new DatabaseQueue(
61+
$this->container->make('db.connection'),
62+
'queue_jobs'
63+
);
5964
$queue->setContainer($container);
6065

6166
return $queue;
@@ -117,7 +122,27 @@ public function __call(string $name, ?array $arguments): mixed
117122
});
118123

119124
$this->container->singleton('queue.failer', function () {
120-
return new NullFailedJobProvider();
125+
/** @var Config $config */
126+
$config = $this->container->make(Config::class);
127+
128+
return new DatabaseUuidFailedJobProvider(
129+
$this->container->make('db'),
130+
$config->offsetGet('database.database'),
131+
'queue_failed_jobs',
132+
$this->container->make('flarum.db')
133+
);
134+
});
135+
136+
// By default, we use the DatabaseQueue, which requires a schedule to run. Other queue drivers
137+
// may not need this, so we allow extensions to override this binding.
138+
$this->container->singleton('flarum.queue.schedule', function (Container $container): array {
139+
return [
140+
'command' => 'queue:work',
141+
'callback' => ContainerUtil::wrapCallback(function ($event) {
142+
$event->everyMinute();
143+
}, $container),
144+
'args' => ($container->make(DatabaseWorkerArgs::class))->args(),
145+
];
121146
});
122147

123148
$this->container->alias('flarum.queue.connection', Queue::class);

0 commit comments

Comments
 (0)