ݺߣ

ݺߣShare a Scribd company logo
Готовим кролика
10 рецептов
Календарев А.М.
Пару слов об pipeline архитектуре
https://habrahabr.ru/company/oleg-bunin/blog/310418/
Pipe-line архитектура
Pipe-line архитектура
ApplicationApplication Application
Пример: аналог Yandex.market
Пример: аналог Yandex.market
Загрузка
YML
Сведение
в pricelist
Парсинг
YML
Загрузка
img
Задачи за кадром
Пример: аналог Yandex.market
Загрузка
YML
Сведение
в pricelist
Парсинг
YML
Загрузка
img
Задачи за кадром
Паттерны сообщений
Cообщения
● Позволяют асинхронно взаимодействоавть
между элементами архитектуры
● Сохраняют слабую связанность между
элементами архитектуры
● Позволяют взаимодействоавть с разными
частями, PHP – Phython – Java - Go ...
Паттерны сообщений
Pipeline & Filter
Не совсем pipeline
Загрузка
YML
Парсинг
YML
Загрузка
картинок
Publish-Subscriber Channel
Routers
Advanced
Message
Queue
Protocol
Рецепты RabbitMQ
Exchange
● Принимает сообщения
● Имеет имя
● Имеет тип:
- fanout
- direct
- topic
- headers
AMQP : Exchange (обмен)
Exchange
AMQP : Exchange (обмен)
● Принимает сообщения
● Имеет имя
● Имеет тип
Имеет свойства:
- autodelete
- transit
- durable
Exchange
AMQP : Exchange (обмен)
$exchange = new AMQPExchange($channel);
$exchange->setName('MyExchange';)
$exchange->declare();
AMQP : Queue (очередь)
Queue
● Отдает сообщения адресату по
принципу FIFO
● Имеет имя
● Имеет свойства:
- autodelete
- durable
- exclusive
AMQP : Queue (очередь)
Queue
$queue = new AMQPQueue($channel);
$queue->setName('MyQueue');
$queue->declare();
AMQP : Bind (связь)
● Между Exchange и Queue определяем связь (или
маршрут): Bind
● Имеет ключ RoutingKey, в соответствии с которым
определяется маршрут сообщения
QueueExchange
Bind
AMQP : Bind (связь)
QueueExchange
Bind
$queue = new AMQPQueue($channel);
$queue->setName('MyQueue');
$queue->bind('MyExchange', $key);
AMQP : Bind (связь)
QueueExchange
Bind
$exchange = new AMQPExchange($channel);
$exchange->setName('MyExchange');
$exchange->bind('MyQueue', $key);
AMQP : Bind (связь)
QueueExchange
Bind
$exchange = new AMQPExchange($channel);
$exchange->setName('MyExchange');
$exchange->bind('MyQueue', $key);
Объявляется
один раз
AMQP : Internals
AMQP : Message
Exchange
Состоит:
● тела
● routing key
● заголовков
● имеет аттрибуты
AMQP : Message
● Expiration period
● Message publishing
timeshamp
Exchange
Аттрибуты:
● Content-type
● Content-encoding
● Delivery-mode
● Message priority
● Application id
AMQP : Message
Exchange
AMQP : Message
В зависимости от типа Exchange и routingKey
сообщения определяется маршрут сообщения
Queue 1
Queue 2
Exchange
AMQP : Message
Fanout – ключ не учитываем
Direct – полное совпадение
Topic – совпадение по маске
Queue 1
Queue 2
Рецепты RabbitMQ
Exchange
AMQP : Message
Fanout – самый быстрый
Direct – середина на половине
Topic – сомый медленный
Queue 1
Queue 2
Exchange
Отправить сообщение
$exchange = new AMQPExchange($channel);
$exchange->setName('MyExchange';)
$exchange->publish($message, $key);
AMQP : ACK
QueueExchange
Bind
На каждое принятое сообщение должна быть
отослана “квитанция” ACK (Acknowledgements).
AMQP : ACK
QueueExchange
Bind
На каждое принятое сообщение должна быть
отослана “квитанция” ACK (Acknowledgements).
Если Очередь имеет св-во AUTOACK
то подтверждение специально отправлять не нужно
ACK
Guaranteed Delivery
AMQP : ACK
workerWEB
Заказ
AMQP : ACK
workerWEB
Заказ
ACK
Удачное соединение
AMQP : ACK
workerWEB
Заказ
Удачное соединение
Не удачноне
соединение
Делаем повторный запрос
Рецепты RabbitMQ
Объявление очереди и обмена
// создание очереди, обмена и их
привязки друг к другу.
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);
$exchange->setName('to_worker');
$exchange->setType('direct');
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_DURABLE);
$queue->setName('worker');
Объявление очереди и обмена
// создание очереди, обмена и их
привязки друг к другу.
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);
$exchange->setName('to_worker');
$exchange->setType('direct');
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_DURABLE);
$queue->setName('worker');
AMQP_DURABLE
AMQP_AUTODELETE
AMQP_EXCLUSIVE
AMQP_INTERNAL
AMQP_IMMEDIATE
AMQP_AUTOACK
Объявление очереди и обмена
// создание очереди, обмена и их
привязки друг к другу.
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);
$exchange->setName('to_worker');
$exchange->setType('direct');
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_DURABLE);
$queue->setName('worker');
AMQP_DURABLE
AMQP_AUTODELETE
AMQP_EXCLUSIVE
AMQP_INTERNAL
AMQP_IMMEDIATE
AMQP_AUTOACK
Это можно сделать
один раз
Привзка очереди к обмену
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($channel);
$queue->setName('worker');
// $queue->declare();
$queue->bind('to_worker', 'logs');
$rabbit->disconnect();
Привзка очереди к обмену
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($channel);
$queue->setName('worker');
// $queue->declare();
$queue->bind('to_worker', 'logs');
$rabbit->disconnect();
Routing key
Привзка очереди к обмену
Это можно сделать
один раз
Привзка очереди к обмену
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($channel);
$queue->setName('worker');
// $queue->declare();
$queue->bind('to_worker', 'logs');
$rabbit->disconnect();
Это можно вообще
не делать
а использовать
HTTP интерфейс
Публикация сообщения
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($channel);
$exchange->setName('to_worker');
$i=0;
while ($i < 20) {
$i++;
$data = json_encode([1,2,3,
rand(0, 1000)]);
$exchange->publish($data,
'logs',
0,
['delivery_mode'=>2,
'content_type'=> 'text/json']
);
}
Публикация сообщения
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$exchange = new AMQPExchange($channel);
$exchange->setName('to_worker');
$i=0;
while ($i < 20) {
$i++;
$data = json_encode([1,2,3,
rand(0, 1000)]);
$exchange->publish($data,
'logs',
0,
['delivery_mode'=>2,
'content_type'=> 'text/json']
);
}
Routing key Persisten
Синхронное / Асинхронное
Асинхронное GET
$rabbit = new AMQPConnection();
$rabbit->connect();
$channel = new AMQPChannel($rabbit
$queue = new AMQPQueue($channel);
$queue->setName('worker');
$msg = $queue->get();
Асинхронное GET
Синхронное CONSUME
function processMessage($envelope, $queue) {
echo 'Message: [',
$envelope->getDeliveryTag(),
']',
$envelope->getBody(),PHP_EOL;
$queue->ack( $envelope->getDeliveryTag() );
}
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($channel);
$queue->setName('worker');
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->consume("processMessage");
Синхронное CONSUME
function processMessage($envelope, $queue) {
echo 'Message: [',
$envelope->getDeliveryTag(),
']',
$envelope->getBody(),PHP_EOL;
$queue->ack( $envelope->getDeliveryTag() );
}
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($channel);
$queue->setName('worker');
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->consume("processMessage");
function processMessage($envelope, $queue) {
echo 'Message: [',
$envelope->getDeliveryTag(),
']',
$envelope->getBody(),PHP_EOL;
$queue->ack( $envelope->getDeliveryTag() );
}
$rabbit = new AMQPConnection();
$res = $rabbit->connect();
$channel = new AMQPChannel($rabbit);
$queue = new AMQPQueue($channel);
$queue->setName('worker');
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->consume("processMessage");
Синхронное CONSUME
Асинхронное GET
Publish/Consume
Publish/Consume
В RannitMQ
работает не так
как ожидается
Publish/Consume
Загрузка
YML
Парсинг
YML
Загрузка
картинок
Publish/Consume
Загрузка
YML
Парсинг
YML
Загрузка
картинок
50%
50%
Publish/Consume
Загрузка
YML
Парсинг
YML
Загрузка
картинок
50%
50%
Потеря
50% контента
Publish/Consume
Загрузка
YML
Загрузка
картинок
Загрузка
картинок
50%
50%
Publish/Consume
Загрузка
YML
Загрузка
картинок
Загрузка
картинок
33.3%
33.3%
33.3%
Загрузка
картинок
Publish/Consume
Загрузка
YML
Парсинг
YML
Загрузка
картинок
Channel -1
Channel -2
Несколько каналов
Парсинг
YML
Загрузка
картинок
Channel -1
Channel -2
Загрузка
YML
content
parsing
uploadingExchange:
Content,
funout, Bind: parsing
durable Bind: uploading
Queue 1:
parsing,
durable
Queue 2:
uploading,
durable
Несколько каналов(вар 2)
Парсинг
YML
Загрузка
картинок
Channel -1
Channel -2
Загрузка
YML
content
parsing
uploadingExchange:
Content,
header,
durable
Queue 1:
parsing,
durable
Queue 2:
uploading,
durable
Bind: operation:x-parsing
Bind: operation:x-uploading
Несколько каналов(вар 2)
Парсинг
YML
Загрузка
картинок
Channel -1
Channel -2
Загрузка
YML
content
parsing
uploadingExchange:
Content,
header,
durable
Queue 1:
parsing,
durable
Queue 2:
uploading,
durable
Bind: operation:x-parsing
Bind: operation:x-uploading
Несколько каналов(вар 2)
Парсинг
YML
Загрузка
картинок
Channel -1
Channel -2
Загрузка
YML
content
parsing
uploadingExchange:
Content,
header,
durable
Queue 1:
parsing,
durable
Queue 2:
uploading,
durable
Bind: operation:x-parsing
Bind: operation:x-uploading
Несколько каналов
import pika
connection = pika.BlockingConnection()
ch = connection.channel()
ch.queue_bind(
exchange='content',
queue='parser',
arguments={'operation': 'parsing', 'x-
match':'any'})
Несколько каналов
import pika
connection = pika.BlockingConnection()
ch = connection.channel()
ch.queue_bind(
exchange='content',
queue='parser',
arguments={'operation': 'parser', 'x-
match':'any'})
в PHP
Нельзя задать
аргументы для Bind
Несколько каналов
$exchange = new AMQPExchange($channel);
$exchange->setName('to_worker');
$ex->setType(AMQP_EX_TYPE_HEADERS);
$data = json_encode([1,2,3]);
$headers = ['operation' => 'parsing'];
$args = [
'content_type'=> 'text/json',
'headers' => $headers
];
$exchange->publish($data,'work',0,$args);
Несколько каналов
$exchange = new AMQPExchange($channel);
$exchange->setName('to_worker');
$ex->setType(AMQP_EX_TYPE_HEADERS);
$data = json_encode([1,2,3]);
$headers = ['operation' => 'parsing'];
$args = [
'content_type'=> 'text/json',
'headers' => $headers
];
$exchange->publish($data,'work',0,$args);
Несколько каналов(вар 2)
Синхронное / Асинхронное
what is the question ?
WEB AJAX лучше асинхронное
WebSocket Синхронное
Синхронное / Асинхронное
WEB AJAX лучше асинхронное
AMQP REST https://github.com/akalend/amqp-rest
Синхронное / Асинхронное
WEB AJAX лучше асинхронное
AMQP REST https://github.com/akalend/amqp-rest
NGX-AMQP https://github.com/WPMedia/nginx-amqp
Синхронное / Асинхронное
WEB AJAX лучше асинхронное
AMQP REST https://github.com/akalend/amqp-rest
NGX-AMQP https://github.com/WPMedia/nginx-amqpНадо делать через
Upstreem & subquery
Синхронное / Асинхронное
WEB AJAX лучше асинхронное
AMQP REST https://github.com/akalend/amqp-rest
NGX-AMQP https://github.com/WPMedia/nginx-amqp
NGX-RABBIT https://github.com/wingify/lua-resty-rabbitmqstomp
Синхронное / Асинхронное
WebSocket Синхронное
NGX-RABBIT https://github.com/wingify/lua-resty-rabbitmqstomp
Node.js
Routers
Exchange
Route Message
Direct – совпадение по ключу
● Bind Queue_1 logs
● Bind Queue_2 worker
Queue 1
Queue 2
Exchange
Route Message
Headers – совпадение по заголовку
● Bind Queue_1 {'operation' : 'logs', 'x-match':'any'}
● Bind Queue_2 {'operation' : 'worker' , 'x-match':'any'}
Queue 1
Queue 2
Messages
Варианты использования сообщений
Варианты использования сообщений
Паттерн: данные
workerWEB
Заказ
Передача
Заказа
Варианты использования сообщений
Загрузка видео с видео хостинга
Load
Convert
WEB
Клиентский
скрипт
Получаем статус
Выполнено
Curl
Channels
Poin-to-Point Channel
Poin-to-Point Channel
Exchange : durable
Queue : durable, exclusive
Guaranteed Delivery
Guaranteed Delivery
Exchange : durable= 1
Message : delivery-type = 2
Queue : durable= 1, autodelete = 0, autoack = 1
Guaranteed Delivery
$delivery-tag = $message->delivery-tag;
$queue->ack($delivery-tag);
Dead letter channel
Запасной канал
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);
$exchange->setName('to_worker');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_AUTODELETE);
$queue->setName('worker');
$queue->declare();
$queue->bind('to_worker', 'work');
$queue->setFlags(AMQP_DURABLE);
$queue->setName('garbage');
$queue->declare();
$queue->bind('to_worker', 'work');
Запасной канал
Запасной канал
$queue->setName('worker');
$queue->delete();
$queue->setArgument('x-dead-letter-
exchange','garbage');
$queue->declare();
$queue->bind('to_worker', 'work');
Request Reply
Correlation Identifier
Correlation Identifier
$cr_id = $message->getArgument('correlation_id');
$exchange->publish( $message, '', ['correlation_id=>$cr_id]);
Datatype channel
debugging
Debugging
Java -cp rabbitmq-client.jar:commons-io-1.2.jar:commons-cli-1.1.jar
com.rabbitmq.tools.Tracer677
Rabbit MQ
2015 № 11
Кролик в песочнице
Rabbit MQ
2015 № 12
RabbitMQ.
Вырастаем из штанишек.
книги
книги
книги
книги
Cпасибо за внимание

More Related Content

Рецепты RabbitMQ