Skip to content

Commit 79c5a27

Browse files
authored
Merge branch 'main' into feature/replcae-laminas-mime
2 parents 1dac419 + 4231056 commit 79c5a27

File tree

10 files changed

+353
-13
lines changed

10 files changed

+353
-13
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
"tijsverkoyen/css-to-inline-styles": "^2.2.5",
123123
"symfony/process": "^6.2",
124124
"dragonmantank/cron-expression": "^3.3.2",
125+
"friendsofhyperf/redis-subscriber": "~3.1.0",
125126
"hypervel/laminas-mime": "^0.1.0"
126127
},
127128
"replace": {

src/core/composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
"hyperf/database": "~3.1.0",
3131
"hyperf/http-message": "~3.1.0",
3232
"hyperf/context": "~3.1.0",
33-
"hyperf/redis": "~3.1.0"
33+
"hyperf/redis": "~3.1.0",
34+
"friendsofhyperf/redis-subscriber": "~3.1.0"
3435
},
3536
"require-dev": {
3637
"fakerphp/faker": "^2.0"

src/core/src/Redis/Redis.php

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
namespace Hypervel\Redis;
66

7+
use Closure;
78
use Hyperf\Redis\Redis as HyperfRedis;
8-
use Hyperf\Redis\RedisFactory;
99
use Hyperf\Redis\RedisProxy;
1010
use Hypervel\Context\ApplicationContext;
1111

@@ -20,4 +20,22 @@ public function connection(string $name = 'default'): RedisProxy
2020
->get(RedisFactory::class)
2121
->get($name);
2222
}
23+
24+
/**
25+
* Subscribe to a set of given channels for messages.
26+
*/
27+
public function subscribe(array|string $channels, Closure $callback): void
28+
{
29+
$this->connection()
30+
->subscribe($channels, $callback);
31+
}
32+
33+
/**
34+
* Subscribe to a set of given channels with wildcards.
35+
*/
36+
public function psubscribe(array|string $channels, Closure $callback): void
37+
{
38+
$this->connection()
39+
->psubscribe($channels, $callback);
40+
}
2341
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Hypervel\Redis;
6+
7+
use Hyperf\Contract\ConfigInterface;
8+
use Hyperf\Redis\Exception\InvalidRedisProxyException;
9+
10+
use function Hyperf\Support\make;
11+
12+
class RedisFactory
13+
{
14+
/**
15+
* @var RedisProxy[]
16+
*/
17+
protected array $proxies = [];
18+
19+
public function __construct(ConfigInterface $config)
20+
{
21+
$redisConfig = $config->get('redis');
22+
23+
foreach ($redisConfig as $poolName => $item) {
24+
$this->proxies[$poolName] = make(RedisProxy::class, ['pool' => $poolName]);
25+
}
26+
}
27+
28+
public function get(string $poolName): RedisProxy
29+
{
30+
$proxy = $this->proxies[$poolName] ?? null;
31+
if (! $proxy instanceof RedisProxy) {
32+
throw new InvalidRedisProxyException('Invalid Redis proxy.');
33+
}
34+
35+
return $proxy;
36+
}
37+
}

src/core/src/Redis/RedisProxy.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Hypervel\Redis;
6+
7+
use Closure;
8+
use Hyperf\Redis\RedisProxy as HyperfRedisProxy;
9+
10+
class RedisProxy extends HyperfRedisProxy
11+
{
12+
/**
13+
* Subscribe to a set of given channels for messages.
14+
*/
15+
public function subscribe(array|string $channels, Closure $callback): void
16+
{
17+
$this->getSubscriber()
18+
->subscribe($channels, $callback);
19+
}
20+
21+
/**
22+
* Subscribe to a set of given channels with wildcards.
23+
*/
24+
public function psubscribe(array|string $channels, Closure $callback): void
25+
{
26+
$this->getSubscriber()
27+
->psubscribe($channels, $callback);
28+
}
29+
30+
protected function getSubscriber(): Subscriber
31+
{
32+
return new Subscriber(
33+
$this->factory->getPool($this->poolName)->getConfig()
34+
);
35+
}
36+
}

src/core/src/Redis/Subscriber.php

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Hypervel\Redis;
6+
7+
use Closure;
8+
use FriendsOfHyperf\Redis\Subscriber\Exception\SocketException;
9+
use FriendsOfHyperf\Redis\Subscriber\Subscriber as RedisSubscriber;
10+
use Hyperf\Contract\StdoutLoggerInterface;
11+
use Hypervel\Context\ApplicationContext;
12+
use Hypervel\Support\Arr;
13+
14+
class Subscriber
15+
{
16+
public function __construct(
17+
protected array $config,
18+
protected ?RedisSubscriber $subscriber = null
19+
) {
20+
$this->subscriber = $subscriber ?: $this->createSubscriber($config);
21+
}
22+
23+
protected function createSubscriber(array $config): RedisSubscriber
24+
{
25+
return new RedisSubscriber(
26+
$config['host'] ?? 'localhost',
27+
$config['port'] ?? 6379,
28+
$config['auth'] ?? '',
29+
$config['timeout'] ?? 5,
30+
$config['options']['prefix'] ?? '',
31+
ApplicationContext::getContainer()->get(StdoutLoggerInterface::class),
32+
);
33+
}
34+
35+
/**
36+
* Subscribe to a set of given channels for messages.
37+
*/
38+
public function subscribe(array|string $channels, Closure $callback): void
39+
{
40+
$this->subscriber->subscribe(...Arr::wrap($channels));
41+
42+
while ($data = $this->subscriber->channel()->pop()) {
43+
$callback($data->payload, $data->channel);
44+
}
45+
46+
if (! $this->subscriber->closed) {
47+
throw new SocketException('Redis connection is disconnected abnormally.');
48+
}
49+
}
50+
51+
/**
52+
* Subscribe to a set of given channels with wildcards.
53+
*/
54+
public function psubscribe(array|string $channels, Closure $callback): void
55+
{
56+
$this->subscriber->psubscribe(...Arr::wrap($channels));
57+
58+
while ($data = $this->subscriber->channel()->pop()) {
59+
$callback($data->payload, $data->channel);
60+
}
61+
62+
if (! $this->subscriber->closed) {
63+
throw new SocketException('Redis connection is disconnected abnormally.');
64+
}
65+
}
66+
}

src/queue/src/Console/ListenCommand.php

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,13 @@ class ListenCommand extends Command
3636
*/
3737
protected string $description = 'Listen to a given queue';
3838

39-
/**
40-
* The queue listener instance.
41-
*/
42-
protected Listener $listener;
43-
4439
/**
4540
* Create a new queue listen command.
4641
*/
47-
public function __construct(Listener $listener)
48-
{
42+
public function __construct(
43+
protected ConfigInterface $config,
44+
protected Listener $listener
45+
) {
4946
parent::__construct();
5047

5148
$this->setOutputHandler($this->listener = $listener);
@@ -77,9 +74,9 @@ public function handle()
7774
*/
7875
protected function getQueue(?string $connection): string
7976
{
80-
$connection = $connection ?: $this->app->get(ConfigInterface::class)->get('queue.default');
77+
$connection = $connection ?: $this->config->get('queue.default');
8178

82-
return $this->input->getOption('queue') ?: $this->app->get(ConfigInterface::class)->get(
79+
return $this->input->getOption('queue') ?: $this->config->get(
8380
"queue.connections.{$connection}.queue",
8481
'default'
8582
);

src/queue/src/Console/MonitorCommand.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ class MonitorCommand extends Command
3838
*/
3939
public function __construct(
4040
protected Factory $manager,
41-
protected EventDispatcherInterface $events
41+
protected EventDispatcherInterface $events,
42+
protected ConfigInterface $config,
4243
) {
4344
parent::__construct();
4445
}
@@ -66,7 +67,7 @@ protected function parseQueues($queues): Collection
6667

6768
if (! isset($queue)) {
6869
$queue = $connection;
69-
$connection = $this->app->get(ConfigInterface::class)->get('queue.default');
70+
$connection = $this->config->get('queue.default');
7071
}
7172

7273
return [

src/queue/src/Console/RetryCommand.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Hypervel\Queue\Events\JobRetryRequested;
1515
use Hypervel\Queue\Failed\FailedJobProviderInterface;
1616
use Hypervel\Support\Traits\HasLaravelStyleCommand;
17+
use Psr\Container\ContainerInterface;
1718
use Psr\EventDispatcher\EventDispatcherInterface;
1819
use RuntimeException;
1920
use stdClass;
@@ -39,6 +40,7 @@ class RetryCommand extends Command
3940
* Create a new queue restart command.
4041
*/
4142
public function __construct(
43+
protected ContainerInterface $app,
4244
protected FailedJobProviderInterface $failer
4345
) {
4446
parent::__construct();

0 commit comments

Comments
 (0)