Skip to content

Commit d06dd5d

Browse files
committedAug 16, 2017
Amqp interop based transports.
1 parent e522247 commit d06dd5d

13 files changed

+530
-0
lines changed
 

‎php-interop/README.md

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# PHP code for RabbitMQ tutorials based on AMQP interop
2+
3+
Here you can find PHP code examples from [RabbitMQ
4+
tutorials](http://www.rabbitmq.com/getstarted.html) adopted to [amqp interop](https://github.com/queue-interop/queue-interop#amqp-interop)
5+
These examples will work with any amqp interop compatible transports such as [enqueue/amqp-ext](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp.md), [enqueue/amqp-bunny](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_bunny.md), [enqueue/amqp-lib](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md)
6+
7+
To successfully use the examples you will need a running RabbitMQ server.
8+
9+
## Requirements
10+
11+
### PHP 5.5+
12+
13+
You need `PHP 5.5` and one of the amqp interop compatible transport.
14+
15+
16+
### Composer
17+
18+
Then [install Composer](https://getcomposer.org/download/) per instructions on their site.
19+
20+
### Client Library
21+
22+
Then you can, for example, install `enqueue/amqp-bunny` using [Composer](http://getcomposer.org).
23+
24+
To do that install Composer and add it to your path, then run the following command
25+
inside this project folder:
26+
27+
```bash
28+
$ composer require enqueue/amqp-bunny
29+
```
30+

‎php-interop/emit_log.php

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
// composer require enqueue/amqp-bunny
4+
require_once __DIR__.'/vendor/autoload.php';
5+
6+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
7+
use Interop\Amqp\AmqpTopic;
8+
9+
$config = [
10+
'host' => 'localhost',
11+
'port' => 5672,
12+
'user' => 'guest',
13+
'pass' => 'guest',
14+
];
15+
16+
$connection = new AmqpConnectionFactory($config);
17+
$context = $connection->createContext();
18+
19+
$topic = $context->createTopic('logs');
20+
$topic->setType(AmqpTopic::TYPE_FANOUT);
21+
22+
$context->declareTopic($topic);
23+
24+
$data = implode(' ', array_slice($argv, 1));
25+
if (empty($data)) {
26+
$data = 'info: Hello World!';
27+
}
28+
$message = $context->createMessage($data);
29+
30+
$context->createProducer()->send($topic, $message);
31+
32+
echo ' [x] Sent ', $data, "\n";
33+
34+
$context->close();

‎php-interop/emit_log_direct.php

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
// composer require enqueue/amqp-bunny
4+
require_once __DIR__.'/vendor/autoload.php';
5+
6+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
7+
use Interop\Amqp\AmqpTopic;
8+
9+
$config = [
10+
'host' => 'localhost',
11+
'port' => 5672,
12+
'user' => 'guest',
13+
'pass' => 'guest',
14+
];
15+
16+
$connection = new AmqpConnectionFactory($config);
17+
$context = $connection->createContext();
18+
19+
$topic = $context->createTopic('direct_logs');
20+
$topic->setType(AmqpTopic::TYPE_DIRECT);
21+
22+
$context->declareTopic($topic);
23+
24+
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
25+
26+
$data = implode(' ', array_slice($argv, 2));
27+
if (empty($data)) {
28+
$data = 'Hello World!';
29+
}
30+
31+
$message = $context->createMessage($data);
32+
$message->setRoutingKey($severity);
33+
34+
$context->createProducer()->send($topic, $message);
35+
36+
echo ' [x] Sent ',$severity,':',$data," \n";
37+
38+
$context->close();

‎php-interop/emit_log_topic.php

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
// composer require enqueue/amqp-bunny
4+
require_once __DIR__.'/vendor/autoload.php';
5+
6+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
7+
use Interop\Amqp\AmqpTopic;
8+
9+
$config = [
10+
'host' => 'localhost',
11+
'port' => 5672,
12+
'user' => 'guest',
13+
'pass' => 'guest',
14+
];
15+
16+
$connection = new AmqpConnectionFactory($config);
17+
$context = $connection->createContext();
18+
19+
$topic = $context->createTopic('topic_logs');
20+
$topic->setType(AmqpTopic::TYPE_TOPIC);
21+
22+
$context->declareTopic($topic);
23+
24+
$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
25+
26+
$data = implode(' ', array_slice($argv, 2));
27+
if (empty($data)) {
28+
$data = 'Hello World!';
29+
}
30+
31+
$message = $context->createMessage($data);
32+
$message->setRoutingKey($routing_key);
33+
34+
$context->createProducer()->send($topic, $message);
35+
36+
echo ' [x] Sent ',$routing_key,':',$data," \n";
37+
38+
$context->close();

‎php-interop/new_task.php

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
// composer require enqueue/amqp-bunny
4+
require_once __DIR__.'/vendor/autoload.php';
5+
6+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
7+
use Interop\Amqp\AmqpMessage;
8+
use Interop\Amqp\AmqpQueue;
9+
10+
$config = [
11+
'host' => 'localhost',
12+
'port' => 5672,
13+
'user' => 'guest',
14+
'pass' => 'guest',
15+
];
16+
17+
$connection = new AmqpConnectionFactory($config);
18+
$context = $connection->createContext();
19+
20+
$queue = $context->createQueue('task_queue');
21+
$queue->addFlag(AmqpQueue::FLAG_DURABLE);
22+
23+
$context->declareQueue($queue);
24+
25+
$data = implode(' ', array_slice($argv, 1));
26+
if (empty($data)) {
27+
$data = 'Hello World!';
28+
}
29+
$message = $context->createMessage($data);
30+
$message->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT);
31+
32+
$context->createProducer()->send($queue, $message);
33+
34+
echo ' [x] Sent ', $data, "\n";
35+
36+
$context->close();

‎php-interop/receive.php

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
// composer require enqueue/amqp-bunny
4+
require_once __DIR__.'/vendor/autoload.php';
5+
6+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
7+
use Interop\Amqp\AmqpConsumer;
8+
9+
$config = [
10+
'host' => 'localhost',
11+
'port' => 5672,
12+
'user' => 'guest',
13+
'pass' => 'guest',
14+
'receive_method' => 'basic_consume',
15+
];
16+
17+
$connection = new AmqpConnectionFactory($config);
18+
$context = $connection->createContext();
19+
20+
$queue = $context->createQueue('hello');
21+
$context->declareQueue($queue);
22+
23+
$consumer = $context->createConsumer($queue);
24+
$consumer->addFlag(AmqpConsumer::FLAG_NOACK);
25+
26+
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
27+
28+
while (true) {
29+
if ($message = $consumer->receive()) {
30+
echo ' [x] Received ', $message->getBody(), "\n";
31+
}
32+
}
33+
34+
$context->close();

‎php-interop/receive_logs.php

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
// composer require enqueue/amqp-bunny
4+
require_once __DIR__.'/vendor/autoload.php';
5+
6+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
7+
use Interop\Amqp\AmqpConsumer;
8+
use Interop\Amqp\AmqpTopic;
9+
use Interop\Amqp\Impl\AmqpBind;
10+
11+
$config = [
12+
'host' => 'localhost',
13+
'port' => 5672,
14+
'user' => 'guest',
15+
'pass' => 'guest',
16+
'receive_method' => 'basic_consume',
17+
];
18+
19+
$connection = new AmqpConnectionFactory($config);
20+
$context = $connection->createContext();
21+
22+
$topic = $context->createTopic('logs');
23+
$topic->setType(AmqpTopic::TYPE_FANOUT);
24+
25+
$context->declareTopic($topic);
26+
27+
$queue = $context->createTemporaryQueue();
28+
29+
$context->bind(new AmqpBind($topic, $queue));
30+
31+
$consumer = $context->createConsumer($queue);
32+
$consumer->addFlag(AmqpConsumer::FLAG_NOACK);
33+
34+
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
35+
36+
while (true) {
37+
if ($message = $consumer->receive()) {
38+
echo ' [x] ', $message->getBody(), "\n";
39+
}
40+
}
41+
42+
$context->close();

‎php-interop/receive_logs_direct.php

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
// composer require enqueue/amqp-bunny
4+
require_once __DIR__.'/vendor/autoload.php';
5+
6+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
7+
use Interop\Amqp\AmqpConsumer;
8+
use Interop\Amqp\AmqpTopic;
9+
use Interop\Amqp\Impl\AmqpBind;
10+
11+
$config = [
12+
'host' => 'localhost',
13+
'port' => 5672,
14+
'user' => 'guest',
15+
'pass' => 'guest',
16+
'receive_method' => 'basic_consume',
17+
];
18+
19+
$connection = new AmqpConnectionFactory($config);
20+
$context = $connection->createContext();
21+
22+
$topic = $context->createTopic('direct_logs');
23+
$topic->setType(AmqpTopic::TYPE_DIRECT);
24+
25+
$context->declareTopic($topic);
26+
27+
$queue = $context->createTemporaryQueue();
28+
29+
$severities = array_slice($argv, 1);
30+
if (empty($severities)) {
31+
file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
32+
exit(1);
33+
}
34+
35+
foreach ($severities as $severity) {
36+
$context->bind(new AmqpBind($topic, $queue, $severity));
37+
}
38+
39+
$consumer = $context->createConsumer($queue);
40+
$consumer->addFlag(AmqpConsumer::FLAG_NOACK);
41+
42+
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
43+
44+
while (true) {
45+
if ($message = $consumer->receive()) {
46+
echo ' [x] '.$message->getRoutingKey().':'.$message->getBody()."\n";
47+
}
48+
}
49+
50+
$context->close();

‎php-interop/receive_logs_topic.php

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
// composer require enqueue/amqp-bunny
4+
require_once __DIR__.'/vendor/autoload.php';
5+
6+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
7+
use Interop\Amqp\AmqpConsumer;
8+
use Interop\Amqp\AmqpTopic;
9+
use Interop\Amqp\Impl\AmqpBind;
10+
11+
$config = [
12+
'host' => 'localhost',
13+
'port' => 5672,
14+
'user' => 'guest',
15+
'pass' => 'guest',
16+
'receive_method' => 'basic_consume',
17+
];
18+
19+
$connection = new AmqpConnectionFactory($config);
20+
$context = $connection->createContext();
21+
22+
$topic = $context->createTopic('topic_logs');
23+
$topic->setType(AmqpTopic::TYPE_TOPIC);
24+
25+
$context->declareTopic($topic);
26+
27+
$queue = $context->createTemporaryQueue();
28+
29+
$binding_keys = array_slice($argv, 1);
30+
if (empty($binding_keys)) {
31+
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
32+
exit(1);
33+
}
34+
35+
foreach ($binding_keys as $binding_key) {
36+
$context->bind(new AmqpBind($topic, $queue, $binding_key));
37+
}
38+
39+
$consumer = $context->createConsumer($queue);
40+
$consumer->addFlag(AmqpConsumer::FLAG_NOACK);
41+
42+
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
43+
44+
while (true) {
45+
if ($message = $consumer->receive()) {
46+
echo ' [x] '.$message->getRoutingKey().':'.$message->getBody()."\n";
47+
}
48+
}
49+
50+
$context->close();

‎php-interop/rpc_client.php

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<?php
2+
3+
// composer require enqueue/amqp-bunny
4+
require_once __DIR__.'/vendor/autoload.php';
5+
6+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
7+
8+
$config = [
9+
'host' => 'localhost',
10+
'port' => 5672,
11+
'user' => 'guest',
12+
'pass' => 'guest',
13+
'receive_method' => 'basic_consume',
14+
];
15+
16+
class FibonacciRpcClient
17+
{
18+
/** @var \Interop\Amqp\AmqpContext */
19+
private $context;
20+
21+
/** @var \Interop\Amqp\AmqpQueue */
22+
private $callback_queue;
23+
24+
public function __construct(array $config)
25+
{
26+
$this->context = (new AmqpConnectionFactory($config))->createContext();
27+
$this->callback_queue = $this->context->createTemporaryQueue();
28+
}
29+
30+
public function call($n)
31+
{
32+
$corr_id = uniqid();
33+
34+
$message = $this->context->createMessage((string) $n);
35+
$message->setCorrelationId($corr_id);
36+
$message->setReplyTo($this->callback_queue->getQueueName());
37+
38+
$this->context->createProducer()->send(
39+
$this->context->createQueue('rpc_queue'),
40+
$message
41+
);
42+
43+
$consumer = $this->context->createConsumer($this->callback_queue);
44+
45+
while (true) {
46+
if ($message = $consumer->receive()) {
47+
if ($message->getCorrelationId() == $corr_id) {
48+
return (int) ($message->getBody());
49+
}
50+
}
51+
}
52+
}
53+
}
54+
55+
$fibonacci_rpc = new FibonacciRpcClient($config);
56+
$response = $fibonacci_rpc->call(30);
57+
echo ' [.] Got ', $response, "\n";

‎php-interop/rpc_server.php

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?php
2+
3+
// composer require enqueue/amqp-bunny
4+
require_once __DIR__.'/vendor/autoload.php';
5+
6+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
7+
8+
$config = [
9+
'host' => 'localhost',
10+
'port' => 5672,
11+
'user' => 'guest',
12+
'pass' => 'guest',
13+
'receive_method' => 'basic_consume',
14+
];
15+
16+
function fib($n)
17+
{
18+
if ($n == 0) {
19+
return 0;
20+
}
21+
22+
if ($n == 1) {
23+
return 1;
24+
}
25+
26+
return fib($n - 1) + fib($n - 2);
27+
}
28+
29+
$connection = new AmqpConnectionFactory($config);
30+
$context = $connection->createContext();
31+
$context->setQos(0, 1, false);
32+
33+
$rpc_queue = $context->createQueue('rpc_queue');
34+
$context->declareQueue($rpc_queue);
35+
36+
$consumer = $context->createConsumer($rpc_queue);
37+
38+
echo " [x] Awaiting RPC requests\n";
39+
40+
while (true) {
41+
if ($req = $consumer->receive()) {
42+
$n = (int) ($req->getBody());
43+
echo ' [.] fib(', $n, ")\n";
44+
45+
$msg = $context->createMessage((string) fib($n));
46+
$msg->setCorrelationId($req->getCorrelationId());
47+
48+
$reply_queue = $context->createQueue($req->getReplyTo());
49+
$context->createProducer()->send($reply_queue, $msg);
50+
51+
$consumer->acknowledge($req);
52+
}
53+
}
54+
55+
$context->close();

‎php-interop/send.php

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
// composer require enqueue/amqp-bunny
4+
require_once __DIR__.'/vendor/autoload.php';
5+
6+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
7+
8+
$config = [
9+
'host' => 'localhost',
10+
'port' => 5672,
11+
'user' => 'guest',
12+
'pass' => 'guest',
13+
];
14+
15+
$connection = new AmqpConnectionFactory($config);
16+
$context = $connection->createContext();
17+
18+
$queue = $context->createQueue('hello');
19+
$context->declareQueue($queue);
20+
21+
$message = $context->createMessage('Hello World!');
22+
23+
$context->createProducer()->send($queue, $message);
24+
25+
echo " [x] Sent 'Hello World!'\n";
26+
27+
$context->close();

‎php-interop/worker.php

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
// composer require enqueue/amqp-bunny
4+
require_once __DIR__.'/vendor/autoload.php';
5+
6+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
7+
use Interop\Amqp\AmqpQueue;
8+
9+
$config = [
10+
'host' => 'localhost',
11+
'port' => 5672,
12+
'user' => 'guest',
13+
'pass' => 'guest',
14+
'receive_method' => 'basic_consume',
15+
];
16+
17+
$connection = new AmqpConnectionFactory($config);
18+
$context = $connection->createContext();
19+
$context->setQos(0, 1, false);
20+
21+
$queue = $context->createQueue('task_queue');
22+
$queue->addFlag(AmqpQueue::FLAG_DURABLE);
23+
24+
$context->declareQueue($queue);
25+
26+
$consumer = $context->createConsumer($queue);
27+
28+
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
29+
30+
while (true) {
31+
if ($message = $consumer->receive()) {
32+
echo ' [x] Received ', $message->getBody(), "\n";
33+
sleep(substr_count($message->getBody(), '.'));
34+
echo ' [x] Done', "\n";
35+
$consumer->acknowledge($message);
36+
}
37+
}
38+
39+
$context->close();

0 commit comments

Comments
 (0)
Please sign in to comment.