9. Cообщения
● Позволяют асинхронно взаимодействоавть
между элементами архитектуры
● Сохраняют слабую связанность между
элементами архитектуры
● Позволяют взаимодействоавть с разными
частями, PHP – Phython – Java - Go ...
22. AMQP : Bind (связь)
● Между Exchange и Queue определяем связь (или
маршрут): Bind
● Имеет ключ RoutingKey, в соответствии с которым
определяется маршрут сообщения
QueueExchange
Bind
36. AMQP : ACK
QueueExchange
Bind
На каждое принятое сообщение должна быть
отослана “квитанция” ACK (Acknowledgements).
Если Очередь имеет св-во AUTOACK
то подтверждение специально отправлять не нужно
ACK
42. Объявление очереди и обмена
// создание очереди, обмена и их
привязки друг к другу.
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);
$exchange->setName('to_worker');
$exchange->setType('direct');
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_DURABLE);
$queue->setName('worker');
43. Объявление очереди и обмена
// создание очереди, обмена и их
привязки друг к другу.
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);
$exchange->setName('to_worker');
$exchange->setType('direct');
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_DURABLE);
$queue->setName('worker');
AMQP_DURABLE
AMQP_AUTODELETE
AMQP_EXCLUSIVE
AMQP_INTERNAL
AMQP_IMMEDIATE
AMQP_AUTOACK
44. Объявление очереди и обмена
// создание очереди, обмена и их
привязки друг к другу.
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);
$exchange->setName('to_worker');
$exchange->setType('direct');
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_DURABLE);
$queue->setName('worker');
AMQP_DURABLE
AMQP_AUTODELETE
AMQP_EXCLUSIVE
AMQP_INTERNAL
AMQP_IMMEDIATE
AMQP_AUTOACK
Это можно сделать
один раз
45. Привзка очереди к обмену
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($channel);
$queue->setName('worker');
// $queue->declare();
$queue->bind('to_worker', 'logs');
$rabbit->disconnect();
46. Привзка очереди к обмену
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($channel);
$queue->setName('worker');
// $queue->declare();
$queue->bind('to_worker', 'logs');
$rabbit->disconnect();
Routing key
48. Привзка очереди к обмену
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($channel);
$queue->setName('worker');
// $queue->declare();
$queue->bind('to_worker', 'logs');
$rabbit->disconnect();
Это можно вообще
не делать
а использовать
HTTP интерфейс
49. Публикация сообщения
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($channel);
$exchange->setName('to_worker');
$i=0;
while ($i < 20) {
$i++;
$data = json_encode([1,2,3,
rand(0, 1000)]);
$exchange->publish($data,
'logs',
0,
['delivery_mode'=>2,
'content_type'=> 'text/json']
);
}
50. Публикация сообщения
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($channel);
$exchange->setName('to_worker');
$i=0;
while ($i < 20) {
$i++;
$data = json_encode([1,2,3,
rand(0, 1000)]);
$exchange->publish($data,
'logs',
0,
['delivery_mode'=>2,
'content_type'=> 'text/json']
);
}
Routing key Persisten
52. Асинхронное GET
$rabbit = new AMQPConnection();
$rabbit->connect();
$channel = new AMQPChannel($rabbit
$queue = new AMQPQueue($channel);
$queue->setName('worker');
$msg = $queue->get();
70. Несколько каналов
import pika
connection = pika.BlockingConnection()
ch = connection.channel()
ch.queue_bind(
exchange='content',
queue='parser',
arguments={'operation': 'parsing', 'x-
match':'any'})
71. Несколько каналов
import pika
connection = pika.BlockingConnection()
ch = connection.channel()
ch.queue_bind(
exchange='content',
queue='parser',
arguments={'operation': 'parser', 'x-
match':'any'})
в PHP
Нельзя задать
аргументы для Bind
77. Синхронное / Асинхронное
WEB AJAX лучше асинхронное
AMQP REST https://github.com/akalend/amqp-rest
NGX-AMQP https://github.com/WPMedia/nginx-amqp
78. Синхронное / Асинхронное
WEB AJAX лучше асинхронное
AMQP REST https://github.com/akalend/amqp-rest
NGX-AMQP https://github.com/WPMedia/nginx-amqpНадо делать через
Upstreem & subquery
79. Синхронное / Асинхронное
WEB AJAX лучше асинхронное
AMQP REST https://github.com/akalend/amqp-rest
NGX-AMQP https://github.com/WPMedia/nginx-amqp
NGX-RABBIT https://github.com/wingify/lua-resty-rabbitmqstomp