Skip to content

Commit 3cee3ff

Browse files
committed
added:
- factory for making a connection. - factory for creating a config object. - test lazy stream connection. fixed: - tests removed: - unnecessary methods
1 parent aee2a42 commit 3cee3ff

File tree

7 files changed

+369
-61
lines changed

7 files changed

+369
-61
lines changed

config/rabbitmq.php

+1-9
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
'driver' => 'rabbitmq',
1111
'queue' => env('RABBITMQ_QUEUE', 'default'),
12-
'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,
12+
'connection' => 'default',
1313

1414
'hosts' => [
1515
[
@@ -22,14 +22,6 @@
2222
],
2323

2424
'options' => [
25-
'ssl_options' => [
26-
'cafile' => env('RABBITMQ_SSL_CAFILE', null),
27-
'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
28-
'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
29-
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
30-
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
31-
],
32-
'queue' => [],
3325
],
3426

3527
/*
+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
<?php
2+
3+
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connection;
4+
5+
use Illuminate\Support\Arr;
6+
use PhpAmqpLib\Connection\AMQPConnectionConfig;
7+
8+
class ConfigFactory
9+
{
10+
protected const CONFIG_OPTIONS = 'options';
11+
12+
public const CONFIG_HOSTS = 'hosts';
13+
14+
/**
15+
* Create a config object from config array
16+
*/
17+
public static function make(array $config = []): AMQPConnectionConfig
18+
{
19+
return tap(new AMQPConnectionConfig(), function (AMQPConnectionConfig $connectionConfig) use ($config) {
20+
// Set the connection to a Lazy by default
21+
$connectionConfig->setIsLazy(! in_array(
22+
Arr::get($config, 'lazy') ?? true,
23+
[false, 0, '0', 'false'],
24+
true)
25+
);
26+
27+
// Set the connection to unsecure by default
28+
$connectionConfig->setIsSecure(in_array(
29+
Arr::get($config, 'secure'),
30+
[true, 1, '1', 'true'],
31+
true)
32+
);
33+
34+
if ($connectionConfig->isSecure()) {
35+
self::sllOptionsFromConfig($connectionConfig, $config);
36+
}
37+
38+
self::hostFromConfig($connectionConfig, $config);
39+
self::heartbeatFromConfig($connectionConfig, $config);
40+
});
41+
}
42+
43+
protected static function hostFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void
44+
{
45+
$hostConfig = Arr::first(Arr::shuffle(Arr::get($config, self::CONFIG_HOSTS, [])), null, []);
46+
47+
if ($location = Arr::get($hostConfig, 'host')) {
48+
$connectionConfig->setHost($location);
49+
}
50+
if ($port = Arr::get($hostConfig, 'port')) {
51+
$connectionConfig->setPort($port);
52+
}
53+
if ($vhost = Arr::get($hostConfig, 'vhost')) {
54+
$connectionConfig->setVhost($vhost);
55+
}
56+
if ($user = Arr::get($hostConfig, 'user')) {
57+
$connectionConfig->setUser($user);
58+
}
59+
if ($password = Arr::get($hostConfig, 'password')) {
60+
$connectionConfig->setPassword($password);
61+
}
62+
}
63+
64+
protected static function sllOptionsFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void
65+
{
66+
$sslConfig = Arr::get($config, self::CONFIG_OPTIONS.'.ssl_options', []);
67+
68+
if ($caFile = Arr::get($sslConfig, 'cafile')) {
69+
$connectionConfig->setSslCaCert($caFile);
70+
}
71+
if ($cert = Arr::get($sslConfig, 'local_cert')) {
72+
$connectionConfig->setSslCert($cert);
73+
}
74+
if ($key = Arr::get($sslConfig, 'local_key')) {
75+
$connectionConfig->setSslKey($key);
76+
}
77+
if ($verifyPeer = Arr::get($sslConfig, 'verify_peer')) {
78+
$connectionConfig->setSslVerify($verifyPeer);
79+
}
80+
if ($passphrase = Arr::get($sslConfig, 'passphrase')) {
81+
$connectionConfig->setSslPassPhrase($passphrase);
82+
}
83+
}
84+
85+
protected static function heartbeatFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void
86+
{
87+
$heartbeat = Arr::get($config, self::CONFIG_OPTIONS.'.heartbeat');
88+
89+
if (! empty($heartbeat) && is_numeric($heartbeat) && 0 < (int) $heartbeat) {
90+
$connectionConfig->setHeartbeat($heartbeat);
91+
}
92+
}
93+
}
+224
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
<?php
2+
3+
namespace VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connection;
4+
5+
use Exception;
6+
use Illuminate\Support\Arr;
7+
use PhpAmqpLib\Connection\AbstractConnection;
8+
use PhpAmqpLib\Connection\AMQPConnectionConfig;
9+
use PhpAmqpLib\Connection\AMQPConnectionFactory;
10+
use PhpAmqpLib\Connection\AMQPLazyConnection;
11+
use PhpAmqpLib\Connection\AMQPLazySocketConnection;
12+
use PhpAmqpLib\Connection\AMQPLazySSLConnection;
13+
use PhpAmqpLib\Connection\AMQPSocketConnection;
14+
use PhpAmqpLib\Connection\AMQPSSLConnection;
15+
use PhpAmqpLib\Connection\AMQPStreamConnection;
16+
use PhpAmqpLib\Exception\AMQPLogicException;
17+
18+
class ConnectionFactory
19+
{
20+
protected const CONNECTION_TYPE_DEFAULT = 'default';
21+
22+
protected const CONNECTION_TYPE_EXTENDED = AbstractConnection::class;
23+
24+
protected const CONNECTION_SUB_TYPE_STREAM = AMQPStreamConnection::class;
25+
26+
protected const CONNECTION_SUB_TYPE_SOCKET = AMQPSocketConnection::class;
27+
28+
protected const CONNECTION_SUB_TYPE_SSL = AMQPSSLConnection::class;
29+
30+
protected const CONFIG_CONNECTION = 'connection';
31+
32+
/**
33+
* Create a Connection
34+
*
35+
* @throws Exception
36+
*/
37+
public static function make(array $config = []): AbstractConnection
38+
{
39+
$connection = self::getConnectionFromConfig($config);
40+
$connectionConfig = ConfigFactory::make($config);
41+
42+
/**
43+
* Todo [Major]:
44+
* - Remove if statement and contents.
45+
* - Remove unused method: self::_createLazyConnection()
46+
*/
47+
if (in_array($connection, [AMQPLazyConnection::class, AMQPLazySocketConnection::class, AMQPLazySSLConnection::class])) {
48+
return self::_createLazyConnection($connection ?: AMQPLazyConnection::class, $config);
49+
}
50+
51+
if (strtolower($connection) == self::CONNECTION_TYPE_DEFAULT) {
52+
return AMQPConnectionFactory::create($connectionConfig);
53+
}
54+
55+
return self::create($connection, $connectionConfig);
56+
}
57+
58+
/**
59+
* Get the validated connection from config
60+
*/
61+
private static function getConnectionFromConfig(array $config): string
62+
{
63+
$connection = (string) Arr::get($config, self::CONFIG_CONNECTION, self::CONNECTION_TYPE_DEFAULT);
64+
65+
self::assertConnectionFromConfig($connection);
66+
67+
return $connection;
68+
}
69+
70+
/**
71+
* Creation of your own connection
72+
*/
73+
private static function create($connection, AMQPConnectionConfig $config): AbstractConnection
74+
{
75+
if ($config->getIoType() === AMQPConnectionConfig::IO_TYPE_SOCKET) {
76+
return self::createSocketConnection($connection, $config);
77+
}
78+
79+
return self::createStreamConnection($connection, $config);
80+
}
81+
82+
private static function createSocketConnection($connection, AMQPConnectionConfig $config): AMQPSocketConnection
83+
{
84+
self::assertSocketConnection($connection, $config);
85+
86+
return new $connection(
87+
$config->getHost(),
88+
$config->getPort(),
89+
$config->getUser(),
90+
$config->getPassword(),
91+
$config->getVhost(),
92+
$config->isInsist(),
93+
$config->getLoginMethod(),
94+
$config->getLoginResponse(),
95+
$config->getLocale(),
96+
$config->getReadTimeout(),
97+
$config->isKeepalive(),
98+
$config->getWriteTimeout(),
99+
$config->getHeartbeat(),
100+
$config->getChannelRPCTimeout(),
101+
$config
102+
);
103+
}
104+
105+
private static function createStreamConnection($connection, AMQPConnectionConfig $config): AMQPStreamConnection
106+
{
107+
self::assertStreamConnection($connection);
108+
109+
if ($config->isSecure()) {
110+
self::assertSSLConnection($connection);
111+
112+
return new $connection(
113+
$config->getHost(),
114+
$config->getPort(),
115+
$config->getUser(),
116+
$config->getPassword(),
117+
$config->getVhost(),
118+
self::getSslOptions($config),
119+
[
120+
'insist' => $config->isInsist(),
121+
'login_method' => $config->getLoginMethod(),
122+
'login_response' => $config->getLoginResponse(),
123+
'locale' => $config->getLocale(),
124+
'connection_timeout' => $config->getConnectionTimeout(),
125+
'read_write_timeout' => self::getReadWriteTimeout($config),
126+
'keepalive' => $config->isKeepalive(),
127+
'heartbeat' => $config->getHeartbeat(),
128+
],
129+
$config->getNetworkProtocol(),
130+
$config
131+
);
132+
}
133+
134+
return new $connection(
135+
$config->getHost(),
136+
$config->getPort(),
137+
$config->getUser(),
138+
$config->getPassword(),
139+
$config->getVhost(),
140+
$config->isInsist(),
141+
$config->getLoginMethod(),
142+
$config->getLoginResponse(),
143+
$config->getLocale(),
144+
$config->getConnectionTimeout(),
145+
self::getReadWriteTimeout($config),
146+
$config->getStreamContext(),
147+
$config->isKeepalive(),
148+
$config->getHeartbeat(),
149+
$config->getChannelRPCTimeout(),
150+
$config->getNetworkProtocol(),
151+
$config
152+
);
153+
}
154+
155+
protected static function getReadWriteTimeout(AMQPConnectionConfig $config): float
156+
{
157+
return min($config->getReadTimeout(), $config->getWriteTimeout());
158+
}
159+
160+
private static function getSslOptions(AMQPConnectionConfig $config): array
161+
{
162+
return array_filter([
163+
'cafile' => $config->getSslCaCert(),
164+
'capath' => $config->getSslCaPath(),
165+
'local_cert' => $config->getSslCert(),
166+
'local_pk' => $config->getSslKey(),
167+
'verify_peer' => $config->getSslVerify(),
168+
'verify_peer_name' => $config->getSslVerifyName(),
169+
'passphrase' => $config->getSslPassPhrase(),
170+
'ciphers' => $config->getSslCiphers(),
171+
'security_level' => $config->getSslSecurityLevel(),
172+
], static function ($value) {
173+
return null !== $value;
174+
});
175+
}
176+
177+
private static function assertConnectionFromConfig(string $connection): void
178+
{
179+
if ($connection !== self::CONNECTION_TYPE_DEFAULT && ! is_subclass_of($connection, self::CONNECTION_TYPE_EXTENDED)) {
180+
throw new AMQPLogicException(sprintf('The config property \'%s\' must contain \'%s\' or must extend: %s', self::CONFIG_CONNECTION, self::CONNECTION_TYPE_DEFAULT, class_basename(self::CONNECTION_TYPE_EXTENDED)));
181+
}
182+
}
183+
184+
private static function assertSocketConnection($connection, AMQPConnectionConfig $config): void
185+
{
186+
self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_SOCKET);
187+
188+
if ($config->isSecure()) {
189+
throw new AMQPLogicException('The socket connection implementation does not support secure connections.');
190+
}
191+
}
192+
193+
private static function assertStreamConnection($connection): void
194+
{
195+
self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_STREAM);
196+
}
197+
198+
private static function assertSSLConnection($connection)
199+
{
200+
self::assertExtendedOf($connection, self::CONNECTION_SUB_TYPE_SSL);
201+
}
202+
203+
private static function assertExtendedOf($connection, string $abstract): void
204+
{
205+
if (! is_subclass_of($connection, $abstract)) {
206+
throw new AMQPLogicException(sprintf('The connection must extend: %s', class_basename($abstract)));
207+
}
208+
}
209+
210+
/**
211+
* @return mixed
212+
*
213+
* @throws Exception
214+
*
215+
* @deprecated This is the fallback method, update your config asap. (example: connection => 'default')
216+
*/
217+
private static function _createLazyConnection($connection, array $config): AbstractConnection
218+
{
219+
return $connection::create_connection(
220+
Arr::shuffle(Arr::get($config, ConfigFactory::CONFIG_HOSTS, [])),
221+
Arr::add(Arr::get($config, 'options', []), 'heartbeat', 0)
222+
);
223+
}
224+
}

0 commit comments

Comments
 (0)