Skip to content

Commit 4231056

Browse files
authored
Merge pull request #35 from hypervel/feature/redis-subscribe
feat: support subscribe command in Redis
2 parents 241e758 + f6c1359 commit 4231056

File tree

7 files changed

+343
-3
lines changed

7 files changed

+343
-3
lines changed

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@
121121
"symfony/mailer": "^6.2",
122122
"tijsverkoyen/css-to-inline-styles": "^2.2.5",
123123
"symfony/process": "^6.2",
124-
"dragonmantank/cron-expression": "^3.3.2"
124+
"dragonmantank/cron-expression": "^3.3.2",
125+
"friendsofhyperf/redis-subscriber": "~3.1.0"
125126
},
126127
"replace": {
127128
"hypervel/auth": "self.version",

src/core/composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
"hyperf/database": "~3.1.0",
3131
"hyperf/http-message": "~3.1.0",
3232
"hyperf/context": "~3.1.0",
33-
"hyperf/redis": "~3.1.0"
33+
"hyperf/redis": "~3.1.0",
34+
"friendsofhyperf/redis-subscriber": "~3.1.0"
3435
},
3536
"require-dev": {
3637
"fakerphp/faker": "^2.0"

src/core/src/Redis/Redis.php

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
namespace Hypervel\Redis;
66

7+
use Closure;
78
use Hyperf\Redis\Redis as HyperfRedis;
8-
use Hyperf\Redis\RedisFactory;
99
use Hyperf\Redis\RedisProxy;
1010
use Hypervel\Context\ApplicationContext;
1111

@@ -20,4 +20,22 @@ public function connection(string $name = 'default'): RedisProxy
2020
->get(RedisFactory::class)
2121
->get($name);
2222
}
23+
24+
/**
25+
* Subscribe to a set of given channels for messages.
26+
*/
27+
public function subscribe(array|string $channels, Closure $callback): void
28+
{
29+
$this->connection()
30+
->subscribe($channels, $callback);
31+
}
32+
33+
/**
34+
* Subscribe to a set of given channels with wildcards.
35+
*/
36+
public function psubscribe(array|string $channels, Closure $callback): void
37+
{
38+
$this->connection()
39+
->psubscribe($channels, $callback);
40+
}
2341
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Hypervel\Redis;
6+
7+
use Hyperf\Contract\ConfigInterface;
8+
use Hyperf\Redis\Exception\InvalidRedisProxyException;
9+
10+
use function Hyperf\Support\make;
11+
12+
class RedisFactory
13+
{
14+
/**
15+
* @var RedisProxy[]
16+
*/
17+
protected array $proxies = [];
18+
19+
public function __construct(ConfigInterface $config)
20+
{
21+
$redisConfig = $config->get('redis');
22+
23+
foreach ($redisConfig as $poolName => $item) {
24+
$this->proxies[$poolName] = make(RedisProxy::class, ['pool' => $poolName]);
25+
}
26+
}
27+
28+
public function get(string $poolName): RedisProxy
29+
{
30+
$proxy = $this->proxies[$poolName] ?? null;
31+
if (! $proxy instanceof RedisProxy) {
32+
throw new InvalidRedisProxyException('Invalid Redis proxy.');
33+
}
34+
35+
return $proxy;
36+
}
37+
}

src/core/src/Redis/RedisProxy.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Hypervel\Redis;
6+
7+
use Closure;
8+
use Hyperf\Redis\RedisProxy as HyperfRedisProxy;
9+
10+
class RedisProxy extends HyperfRedisProxy
11+
{
12+
/**
13+
* Subscribe to a set of given channels for messages.
14+
*/
15+
public function subscribe(array|string $channels, Closure $callback): void
16+
{
17+
$this->getSubscriber()
18+
->subscribe($channels, $callback);
19+
}
20+
21+
/**
22+
* Subscribe to a set of given channels with wildcards.
23+
*/
24+
public function psubscribe(array|string $channels, Closure $callback): void
25+
{
26+
$this->getSubscriber()
27+
->psubscribe($channels, $callback);
28+
}
29+
30+
protected function getSubscriber(): Subscriber
31+
{
32+
return new Subscriber(
33+
$this->factory->getPool($this->poolName)->getConfig()
34+
);
35+
}
36+
}

src/core/src/Redis/Subscriber.php

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Hypervel\Redis;
6+
7+
use Closure;
8+
use FriendsOfHyperf\Redis\Subscriber\Exception\SocketException;
9+
use FriendsOfHyperf\Redis\Subscriber\Subscriber as RedisSubscriber;
10+
use Hyperf\Contract\StdoutLoggerInterface;
11+
use Hypervel\Context\ApplicationContext;
12+
use Hypervel\Support\Arr;
13+
14+
class Subscriber
15+
{
16+
public function __construct(
17+
protected array $config,
18+
protected ?RedisSubscriber $subscriber = null
19+
) {
20+
$this->subscriber = $subscriber ?: $this->createSubscriber($config);
21+
}
22+
23+
protected function createSubscriber(array $config): RedisSubscriber
24+
{
25+
return new RedisSubscriber(
26+
$config['host'] ?? 'localhost',
27+
$config['port'] ?? 6379,
28+
$config['auth'] ?? '',
29+
$config['timeout'] ?? 5,
30+
$config['options']['prefix'] ?? '',
31+
ApplicationContext::getContainer()->get(StdoutLoggerInterface::class),
32+
);
33+
}
34+
35+
/**
36+
* Subscribe to a set of given channels for messages.
37+
*/
38+
public function subscribe(array|string $channels, Closure $callback): void
39+
{
40+
$this->subscriber->subscribe(...Arr::wrap($channels));
41+
42+
while ($data = $this->subscriber->channel()->pop()) {
43+
$callback($data->payload, $data->channel);
44+
}
45+
46+
if (! $this->subscriber->closed) {
47+
throw new SocketException('Redis connection is disconnected abnormally.');
48+
}
49+
}
50+
51+
/**
52+
* Subscribe to a set of given channels with wildcards.
53+
*/
54+
public function psubscribe(array|string $channels, Closure $callback): void
55+
{
56+
$this->subscriber->psubscribe(...Arr::wrap($channels));
57+
58+
while ($data = $this->subscriber->channel()->pop()) {
59+
$callback($data->payload, $data->channel);
60+
}
61+
62+
if (! $this->subscriber->closed) {
63+
throw new SocketException('Redis connection is disconnected abnormally.');
64+
}
65+
}
66+
}

tests/Core/RedisSubscriberTest.php

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Hypervel\Tests\Core;
6+
7+
use FriendsOfHyperf\Redis\Subscriber\Exception\SocketException;
8+
use FriendsOfHyperf\Redis\Subscriber\Subscriber as RedisSubscriber;
9+
use Hypervel\Redis\Subscriber;
10+
use Hypervel\Tests\TestCase;
11+
use Mockery as m;
12+
use stdClass;
13+
14+
/**
15+
* @internal
16+
* @covers \Hypervel\Redis\Subscriber
17+
*/
18+
class RedisSubscriberTest extends TestCase
19+
{
20+
public function testSubscribe()
21+
{
22+
// Arrange
23+
$config = ['host' => 'localhost'];
24+
$channels = ['channel1', 'channel2'];
25+
$mockRedisSubscriber = m::mock(RedisSubscriber::class);
26+
$mockChannel = m::mock('stdClass');
27+
28+
// Set up the channel to return a message once and then null
29+
$message = new stdClass();
30+
$message->payload = 'test-payload';
31+
$message->channel = 'channel1';
32+
33+
$mockChannel->shouldReceive('pop')
34+
->once()
35+
->andReturn($message);
36+
$mockChannel->shouldReceive('pop')
37+
->once()
38+
->andReturnNull();
39+
40+
$mockRedisSubscriber->shouldReceive('subscribe')
41+
->once()
42+
->with(...$channels);
43+
44+
$mockRedisSubscriber->shouldReceive('channel')
45+
->twice()
46+
->andReturn($mockChannel);
47+
48+
$mockRedisSubscriber->closed = true;
49+
50+
$subscriber = new Subscriber($config, $mockRedisSubscriber);
51+
52+
$callbackCalled = false;
53+
$callback = function ($payload, $channel) use (&$callbackCalled, $message) {
54+
$callbackCalled = true;
55+
$this->assertEquals($message->payload, $payload);
56+
$this->assertEquals($message->channel, $channel);
57+
};
58+
59+
// Act
60+
$subscriber->subscribe($channels, $callback);
61+
62+
// Assert
63+
$this->assertTrue($callbackCalled);
64+
}
65+
66+
public function testSubscribeThrowsExceptionWhenConnectionClosedAbnormally()
67+
{
68+
// Arrange
69+
$config = ['host' => 'localhost'];
70+
$channels = ['channel1'];
71+
$mockRedisSubscriber = m::mock(RedisSubscriber::class);
72+
$mockChannel = m::mock('stdClass');
73+
74+
$mockChannel->shouldReceive('pop')
75+
->once()
76+
->andReturnNull();
77+
78+
$mockRedisSubscriber->shouldReceive('subscribe')
79+
->once()
80+
->with(...$channels);
81+
82+
$mockRedisSubscriber->shouldReceive('channel')
83+
->once()
84+
->andReturn($mockChannel);
85+
86+
$mockRedisSubscriber->closed = false;
87+
88+
$subscriber = new Subscriber($config, $mockRedisSubscriber);
89+
90+
$callback = function () {
91+
// Empty callback
92+
};
93+
94+
// Assert & Act
95+
$this->expectException(SocketException::class);
96+
$this->expectExceptionMessage('Redis connection is disconnected abnormally.');
97+
98+
$subscriber->subscribe($channels, $callback);
99+
}
100+
101+
public function testPsubscribe()
102+
{
103+
// Arrange
104+
$config = ['host' => 'localhost'];
105+
$patterns = ['channel*'];
106+
$mockRedisSubscriber = m::mock(RedisSubscriber::class);
107+
$mockChannel = m::mock('stdClass');
108+
109+
// Set up the channel to return a message once and then null
110+
$message = new stdClass();
111+
$message->payload = 'test-payload';
112+
$message->channel = 'channel1';
113+
114+
$mockChannel->shouldReceive('pop')
115+
->once()
116+
->andReturn($message);
117+
$mockChannel->shouldReceive('pop')
118+
->once()
119+
->andReturnNull();
120+
121+
$mockRedisSubscriber->shouldReceive('psubscribe')
122+
->once()
123+
->with(...$patterns);
124+
125+
$mockRedisSubscriber->shouldReceive('channel')
126+
->twice()
127+
->andReturn($mockChannel);
128+
129+
$mockRedisSubscriber->closed = true;
130+
131+
$subscriber = new Subscriber($config, $mockRedisSubscriber);
132+
133+
$callbackCalled = false;
134+
$callback = function ($payload, $channel) use (&$callbackCalled, $message) {
135+
$callbackCalled = true;
136+
$this->assertEquals($message->payload, $payload);
137+
$this->assertEquals($message->channel, $channel);
138+
};
139+
140+
// Act
141+
$subscriber->psubscribe($patterns, $callback);
142+
143+
// Assert
144+
$this->assertTrue($callbackCalled);
145+
}
146+
147+
public function testPsubscribeThrowsExceptionWhenConnectionClosedAbnormally()
148+
{
149+
// Arrange
150+
$config = ['host' => 'localhost'];
151+
$patterns = ['channel*'];
152+
$mockRedisSubscriber = m::mock(RedisSubscriber::class);
153+
$mockChannel = m::mock('stdClass');
154+
155+
$mockChannel->shouldReceive('pop')
156+
->once()
157+
->andReturnNull();
158+
159+
$mockRedisSubscriber->shouldReceive('psubscribe')
160+
->once()
161+
->with(...$patterns);
162+
163+
$mockRedisSubscriber->shouldReceive('channel')
164+
->once()
165+
->andReturn($mockChannel);
166+
167+
$mockRedisSubscriber->closed = false;
168+
169+
$subscriber = new Subscriber($config, $mockRedisSubscriber);
170+
171+
$callback = function () {
172+
// Empty callback
173+
};
174+
175+
// Assert & Act
176+
$this->expectException(SocketException::class);
177+
$this->expectExceptionMessage('Redis connection is disconnected abnormally.');
178+
179+
$subscriber->psubscribe($patterns, $callback);
180+
}
181+
}

0 commit comments

Comments
 (0)