rabit mq для чего используют

RabbitMQ tutorial 1 — Hello World

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

RabbitMQ позволяет взаимодействовать различным программам при помощи протокола AMQP. RabbitMQ является отличным решением для построения SOA (сервис-ориентированной архитектуры) и распределением отложенных ресурсоемких задач.

Под катом перевод первого из шести уроков официального сайта. Примеры на python, но его знание вовсе не обязательно. Аналогичные примеру программы можно воспроизвести практически на любом популярном ЯП. [так выглядят комментарии переводчика, т.е. меня]

Вступление

RabbitMQ ‒ это брокер сообщений. Его основная цель ‒ принимать и отдавать сообщения. Его можно представлять себе, как почтовое отделение: когда Вы бросаете письмо в ящик, Вы можете быть уверены, что рано или поздно почтальон доставит его адресату [видимо, автор ни разу не имел дела с Почтой России]. В этой аналогии RabbitMQ является одновременно и почтовым ящиком, и почтовым отделением, и почтальоном.

Наибольшее отличие RabbitMQ от почтового отделения в том, что он не имеет дела с бумажными конвертами ‒ RabbitMQ принимает, хранит и отдает бинарные данные ‒ сообщения.

В RabbitMQ, а также обмене сообщениями в целом, используется следующая терминология:

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Поставщик, подписчик и брокер не обязаны находиться на одной физической машине, обычно они находятся на разных.

Hello World!

Первый пример не будет особо сложным ‒ давайте просто отправим сообщение, примем его и выведем на экран. Для этого нам потребуется две программы: одна будет отправлять сообщения, другая ‒ принимать и выводить их на экран.
Общая схема такова:

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Поставщик отправляет сообщения в очередь с именем «hello», а подписчик получает сообщения из этой очереди.

Библиотека RabbitMQ

RabbitMQ использует протокол AMQP. Для использования RabbitMQ необходима библиотека, поддерживающая этот протокол. Такие библиотеки можно найти практически для каждого языка программирования. Python ‒ не исключение, для него есть несколько библиотек:

Отправка сообщений

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Наша первая программа send.py будет просто отправлять одно сообщение в очередь.

Мы подключились к брокеру сообщений, находящемуся на локальном хосте. Для подключения к брокеру, находящемуся на другой машине, достаточно заменить «localhost» на IP адрес этой машины.

Перед отправкой сообщения мы должны убедиться, что очередь, получающая сообщение, существует. Если отправить сообщение в несуществующую очередь, RabbitMQ его проигнорирует. Давайте создадим очередь, в которую будет отправлено сообщение, назовем ее «hello»:

Теперь все готово для отправки сообщения. Наше первое сообщение будет содержать строку и будет отправлено в очередь с именем «hello».

Вообще, в RabbitMQ сообщения не отправляются непосредственно в очередь, они должны пройти через exchange (точка обмена). Но сейчас мы не будем заострять на этом внимание, точки обмена будут рассмотрены в третьем уроке. Сейчас достаточно знать, что точку обмена по-умолчанию можно определить, указав пустую строку. Это специальная точка обмена ‒ она позволяет определять, в какую именно очередь отправлено сообщение. Имя очереди должно быть определено в параметре routing_key:

Перед выходом из программы необходимо убедиться, что буфер был очищен и сообщение дошло до RabbitMQ. В этом можно быть уверенным, если использовать безопасное закрытие соединения с брокером.

Получение сообщений

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Наша вторая программа receive.py будет получать сообщения из очереди и выводить их на экран.

Также как и в первой программе сначала необходимо подключиться к RabbitMQ. Для этого следует использовать тот же код, как и ранее. Следующий шаг, как и прежде ‒ убедиться, что очередь существует. Команда queue_declare не будет создавать новую очередь, если она уже существует, поэтому сколько бы раз не была вызвана эта команда, все-равно будет создана только одна очередь.

Вы можете задаться вопросом, почему мы объявляем очередь снова, ведь она уже была объявлена в первой программе. Это нужно, чтобы быть уверенным в существовании очереди, так будет, если сначала будет запущена программа send.py. Но мы не знаем, какая программа будет запущена раньше. В таких случаях лучше объявить очередь в обеих программах.

Мониторинг очередей

Если Вы хотите посмотреть, какие очереди существуют в RabbitMQ на данный момент, Вы можете сделать это с помощью команды rabbitmqctl (потребуются права суперпользователя):

(для Windows ‒ без sudo)

[в нашей компании используют более удобный скрипт мониторинга:]

[скрипт выводит и обновляет каждые 2 секунды таблицу со списком очередей: имя очереди; количество сообщений в обработке; количество сообщений готовых к обработке; общее количество сообщений; устойчивость очереди к перезагрузке сервиса; является ли временной очередью; количество подписчиков]

Получение сообщений из очереди более сложный процесс, чем отправка. Получение осуществляется при помощи подписки с использованием callback функции. При получении каждого сообщения библиотека Pika вызывает эту callback функцию. В нашем примере она будет выводить на экран текст сообщения.

Далее, нам нужно обозначить, что callback функция будет получать сообщения из очереди с именем «hello»:

Здесь необходимо быть уверенным в том, что очередь, на которую мы хотим подписаться, была объявлена. Мы сделали это ранее с помощью команды queue_declare.

Параметр no_ack будет рассмотрен позже [во втором уроке].
И, наконец, запуск бесконечного процесса, который ожидает сообщения из очереди и вызывает callback функцию, когда это необходимо.

Ну а теперь все вместе

Полный код receive.py:

Теперь мы можем попробовать запустить наши программы в терминале. Сначала отправим сообщение при помощи программы send.py:

Выполнение этой программы будет завершаться после отправки каждого сообщения. Теперь сообщение нужно получить:

Отлично! Мы отправили наше первое сообщение через RabbitMQ. Как Вы могли заметить, выполнение программы receive.py не завершилось. Она будет ожидать следующих сообщений, а остановить ее можно, нажав Ctrl+C.
Попробуйте запустить send.py снова в новом окне терминала.

Мы изучили, как отправлять и получать сообщения через именованные очереди. В следующем уроке мы создадим простую очередь задач [ресурсоемких].

UPD: библиотеку, работающую с RabbitMQ, для своего любимого ЯП Вы можете найти на официальном сайте тут.

Источник

RabbitMQ. Часть 1. Introduction. Erlang, AMQP

Добрый день, Хабр! Хочу поделиться учебником-справочником знаний, которые мне удалось собрать по RabbitMQ и сжать в короткие рекомендации и выводы.

Оглавление

Кратко про AMQP

AMQP (Advanced Message Queuing Protocol) — открытый протокол для передачи сообщений между компонентами системы. Основная идея состоит в том, что отдельные подсистемы (или независимые приложения) могут обмениваться произвольным образом сообщениями через AMQP-брокер, который осуществляет маршрутизацию, возможно гарантирует доставку, распределение потоков данных, подписку на нужные типы сообщений.

Протокол AMQP вводит три понятия:

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Протокол работает поверх TCP/IP.

Кратко про Erlang

Исходный код проекта находится в репозитории на GitHub. Архитектура RabbitMQ-server основана на Erlang и BEAM.

Кратко про RabbitMQ

Основная идея модели обмена сообщениями в RabbitMQ заключается в том, что producer (издатель) не отправляет сообщения непосредственно в очередь. На самом деле и довольно часто издатель даже не знает, будет ли сообщение вообще доставлено в какую-либо очередь.

Вместо этого издатель может отправлять сообщения только на обмен. С одной стороны, обмен получает сообщения от издателей, а с другой — отправляет их в очереди. Обмен должен точно знать, что делать с полученным сообщением. Должно ли оно быть добавлено в определенную очередь? Должно ли оно быть добавлено в несколько очередей? Или сообщение нужно игнорировать.

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Кратко работу RabbitMQ можно описать следующим образом:

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Подключение и каналы

Для такого обмена информацией между клиентом и сервером используются каналы. Каналы создаются в рамках определенного подключения. Каждый канал изолирован от других каналов. В синхронном случае не возможно выполнять следующую команду, пока не получен ответ.

Для того чтобы иметь возможность отправлять команды параллельно приходится открывать несколько каналов. Каждый канал создает отдельный Erlang процесс. Одно подключение может иметь множество каналов (multiplexing). Для каждого канала существуют некие структуры и объекты в памяти. Поэтому чем больше каналов имеется в рамках соединения, тем больше памяти использует RabbitMQ для управления таким соединением.

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Простой пример создания подключения и канала при помощи RabbitMQ.Client:

Открывать новое соединение для каждой операции, настоятельно не рекомендуется, поскольку это приведет к большим затратам. Каналы также должны быть постоянными, но многие ошибки протокола приводят к закрытию канала, поэтому срок службы канала может быть короче, чем у соединения.

Где используется RabbitMQ?

В контексте микросервисов протокол AMQP и его реализацию в RabbitMQ часто используют для асинхронного взаимодействия между сервисами.

В контексте IIOT протокол AMQP и его реализацию в RabbitMQ используют для обмена данными между серверами (сервер-сервер). Также используют плагин MQTT Plugin RabbitMQ являющегося реализацией протокола MQTT для передачи данных между датчиком и сервером в низкоскоростных средах с высокой задержкой (полный перечень поддерживаемых протоколов перечислен на сайте проекта).

В следующей статье начнем разбираться подробнее с Exchanges.

Источник

RabbitMQ: запуск, описание, примеры

RabbitMQ – менеджер сообщений (message broker), написан на Erlang, ближайший аналог в AWS – SQS.

Предназначен для передачи данных (сообщений) между несколькими сервисами : один сервис добавляет в очередь сообщение, другой – получает это сообщение.

Ниже – пример установки, запуска и использования RabbitMQ.

Установка

Либо можно запустить из Docker-образа:

rabbitmq-plugins

Для просмотра активных плагинов – используем list :

$ sudo rabbitmq-plugins list

Активируем плагин rabbitmq_management :

$ sudo rabbitmq-plugins enable rabbitmq_management

Активные плагины хранятся в файле /etc/rabbitmq/enabled_plugins :

HTTP API

rabbitmq_management активирует поддержку API на порту 15672:

rabbitmqadmin

rabbitmqctl

Предназначена в основном для управления нодами в кластере – добавлением, удалением, перезагрузкой, управление логами.

Аналогично rabbitmqadmin – может управлять пользователеми, очередями, точками обмена и т.д:

Web-UI

После активации плагина – становится доступен веб-интефрейс для управления сервером на порт 15672:

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Доступ пользователю guest по-умолчанию запрещён, добавляем нового пользователя:

Устанавливаем его администратором:

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Суть работы

Для RabbitMQ есть три основных понятия:

Пример

Отправка сообщения

Создаём скрипт для producer – он будет отправлять сообщения:

Проверяем с помощью rabbitmqadmin :

Список очередей с помощью rabbitmqctl :

В очереди hello сейчас висит 1 сообщение – получим его.

Получение сообщения

Второй скрипт – consumer.py – получит сообщение из очереди, и выведет его на консоль:

Источник

Ajaxblog

RabbitMQ для начинающих

Иногда в веб-приложениях появляется необходимость выполнить сложные ресурсоемкие задачи, которые не могут быть умещены в коротком временном интервале HTTP запроса. В этом случае на помощь приходят очереди. Основная идея очередей – избежать выполнения ресурсоемких задач непосредственно после отправки запроса. Вместо этого задача ставится в очередь для последующего выполнения в асинхронном режиме. Т.е. при получении запроса от клиента мы инкапсулируем задачу как сообщение и отправляем его в очередь, а уже обработчик очереди достает сообщения в порядке их следования и обрабатывает надлежащим образом. Забегая вперед, скажу, что возможен режим работы очередей, когда при наличии нескольких копий обработчика, следующая задач будет поступать на свободный обработчик. Таким образом достигается распараллеливание выполнения задач.

В данном разделе рассматривается работа с очередями, использующими сервер сообщений RabbitMQ. Сервер RabbitMQ по сути является менеджером очередей, который имеет следующие преимущества:

В туториалах будут приведены примеры для всех вышеперечисленных вариантов. За основу взяты туториалы с официального сайта, дополнены и реализованы на PHP для RabbitMQ.

RabbitMQ испозует протокол AMQP. Чтобы использовть RabbitMQ необходимо поставить клиентскую и серверную части.

Установка сервера

Для установки расширения AMQP для PHP необходимо сначала установить RabbitMQ Server

Добавим следующию строку в файл /etc/apt/sources.list

Установка клиента

Выбираем нужную библиотеку и устанавливаем http://www.rabbitmq.com/devtools.html. Наиболее популярны php-amqplib и PECL AMQP library

Базовые понятия

В RabbitMQ используются следующий обозначения. Продюсер – программа, которая посылает сообщения. Будем обозначать его так

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Брокер(очередь) – собственно просто буфер в памяти без каких-либо ограничений на количество хранимых сообщений. В одну и ту же очередь могут отсылать сообщения несколько продюсеров, так же как несколько консьюмеров могут пытаться получить сообщения из одной и той же очереди. Очередь будет обозначена так(сверху указано имя очереди)

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Консьюмер(получатель) – программа, которая принимает сообщения из очереди. Будем обозначать его так

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Здесь важно отметить, что продюсер, консьюмер и брокер могут быть расположены на различных машинах, более того, в большинстве случаев это именно так.

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Первый скрипт работы с очередью, своего рода “Hello world”, будет отсылать текстовое сообщение с клиента, принимать его на сервере и выводить на экран.

Т.е. схема работы следующая: Первое, что надо сделать, это установить соединение с сервером RabbitMQ. Соединение устанавливается командами

это дефолтные значения. Если достаточно дефолтного значения любого из этих параметров, то его можно опустить. И, напротив, если, к примеру, нужно подключиться к другой машине, в параметре host необходимо указать ее имя или ip адрес.

Используя коннект можно получить объект для канала

На основе полученного канала создаем обменник

и, собственно, саму очередь

Когда обменник и очередь готовы, их можно связать по ключу

После того как сообщение отослано, коннект можно разорвать.

Получатель также должен выполнить ту же последовательность – приконнектиться к серверу сообщений; – создать канал; – объявить обменник; – объявить очередь; – связать очередь с обменником по ключу Последние два действия, как упоминалось выше, не обязательны. Теперь можно начать прослушивать очередь

Здесь методу get в качетсве параметра передается константа ARMQ_AUTOACK, которая оповещает сервер сообщений о том, что данное сообщение получено. Это самый простой способ удалить сообщение из очереди. Однако в данном случае в случае неудачной обработки сообщения, вернуть повторно его в очередь нельзя.

Таким образом, получаем два скрипта

Распределенные очереди

Основная идея заключается в следующем. Допустим нужно обработать видео файл, чтобы получить на выходе три сконвертированных файла в различные форматы, информацию о метаданных и создать иконки для этого видеофайла. Т.е. получаем 5 задач, три из которых довольно тяжеловесные(конвертация), одна легкая(получение метаданных) и одна средняя(создание иконок). При этом все эти задачи являются независимыми друг от друга. Таким образом, можно выполнять их одновременно, т.е. распараллелить обработку очереди на уровне сообщений(пункт 2). Для этого при объявлении обменника необходимо установить ему тип AMQP_EX_TYPE_FANOUT. Тогда все сообщения, посылаемые в указанный обменник, независимо от имени очереди и ключа роутера, будут прослушиваться всеми запущенными копиями консьюмера. Т.е. каждое следующее сообщение будет отсылаться на следующий свободный консьюмер. В нашем случае их должно быть пять. Такой способ обработки получил название round-robin dispathing. Обратите внимание, что при отправке продюсером и при получении консьюмером используется одна и та же очередь.

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Оповещение (acknowledgment)

Некоторые задачи могут выполняться довольно долго. И неизвестно, что может произойти с сервером в этот момент: сервер может перезагрузиться, либо задача может зависнуть или завершится фатальной ошибкой. В первом туториале оповещение было отключено путем передачи параметра AMQP_AUTOACK в метод get(). В этом случае сообщения удаляются из памяти сразу после выполнения метода get и в случае ошибки, случившейся во время обработки, не вернутся в очередь. Чтобы избежать этого, не будем передавать константу AMQP_AUTOACK в метод get. Вместо этого по завершению обработки вызовем метод ack(), который уведомит брокер о том, что сообщение успешно обработано и его можно удалить из памяти. В противном случае RabbitMQ понимает, что сообщение не обратботано и перенаправляет его другому свободному консьюмеру. Однако здесь стоит отметить один важный момент. Перенаправленные сообщения не будут обрабатываться до того пока консьюмер не отконнектится и приконнектится заново к брокеру. Если необходимо заново обработать сообщение в рамках того же коннекта к серверу сообщений, то необходимо вызвать метод nack() с флагом AMQP_REQUEUE, который поставит неудачно обработанную задачу обратно в очередь и уведомит брокер о том, что эта задача должна быть вновь обработана.

Распростаненная ошибка – при включенном оповещении не подтверждать корректно обработанные задачи(сообщения). В этом случае при каждом новом коннекте, все уже обработанные задачи будут поступать заново на обработку. Процесс будет выглядеть как беспорядочная повторная отпарка сообщений, что в конечном итоге приведет к переполнению памяти. Отследить такую ситуацию можно путем использования нативного инструмента сервера сообщений rabbitmqctl

Жизнеспособность сообщений (durability)

В предыдущем параграфе мы рассмотрели как не потерять сообщение в очереди путем повторной отправки его в очередь. Тем не менее сообщение может быть потеряно в случае если сервер сообщений был неожиданно остановлен. Чтобы этого избежать, очередь должна быть создана с флагом AMQP_DURABLE.

Если очередь ‘hello’ уже была объявлена, то данный код вызовет ошибку, поскольку один раз объявленную очередь нельзя объявить повторно с другими параметрами. Из этой ситуации есть два выхода, либо обнулить все очереди как сказано здесь, либо создать новую очередь с неиспользуемым именем. Посмотреть список очередей можно спопособом упомянутым в предыдущем параграфе. Установка флага AMQP_DURABLE не гарантирует стопроцентную сохранность сообщений в очереди. Несмотря на то, что таким спопосбом мы указываем RabbitMQ сохранять сообщения на диске, существует мертвая зона после получения соощения, когда оно уже в памяти, но еще не сохранено на диске. В этот момент, в случае не предвиденной ситуации, оно может быть утеряно из памяти. Для нашего простого примера таких гарантий достаточно, но если необходимо добиться высоких гарантий получения сообщения, то следует использовать транзакции.

Все вместе

Для примера распределения сообщений между очередями нам понадобится функция, имитирующая загруженность системы. Для этого мы используем обычный таймер

Полный код продюсера (send.php) будет выглядеть так

Обратите внимание, что очереди создаются с абсолютно идентичными параметрами. Как уже было сказано, повторное создание очереди с иными параметрами вызовет исключение. Также стоит отметить, что если вы уверены, что консьюмер будет запускаться первым, то создание очереди в продюсере не обязательно.

Теперь, если запустить несколько копий консьюмера, то можно будет видеть как между ними распределяются сообщения. Предположим, что мы отправили 8 сообщений в очередь, т.е. запустили скрипт send.php 8 раз. После этого запускаем в двух разных терминалах по консьюмеру

Вывод в первом терминале

Вывод во втором терминале

Как видно сообщения распределились по мере нагрузки консьюмера.

Рассылка публикаций

В предыдущем уроке мы распределяли сообщения между всеми консьюмерами. В данном уроке, наоборот, будем отсылать все сообщения из очереди на все консьюмеры. Такой шаблон известен как “публичная рассылка”(publish subscribe). Такое поведение может быть полезно, к примеру, при создании логирования с одновременным выводом сообщения в терминал. Т.е. один консьюмер получает сообщение и сохраняет его на диска, в то время как другой выводит это сообщение на экране.

В предыдущих разделах мы не заостряли внимание на обменнике(exchanger). На самом деле продюсер никогда не отправляет сообщения непосредственно в очередь. Он размещает их в обменнике. Собственно говоря, продюсер и не знает было ли сообщение доставлено в очередь или нет. Обменник представляет собой простую вещь – он получает сообщения от продюсера и отправляет(публикует) их в очередь. При этом обменник четко знает по какому алгоритму он работает:

В нашем примере будем использовть тип обменника fanout.

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Для этой цели продюсер не создает именованную очередь. Консьюмер же, в свою очередь, создает анонимную очередь, в которую принимает сообщения продюсера. При таком подходе каждый консьюмер будет принимать все сообщения продюсера.

Анонимные очереди

В предыдущем уроке у нас была необходимость рассылки сообщений в очереди с одинаковыми именами для возможности распределения сообщений между продюсерами и консьюмерами. Для достижения же текущей цели нам нужны выполнить две вещи. Во-первых, нам нужны очереди с различными именами. Во-вторых, созданные очереди должны автоматически удаляться после окончания работы скрипта. Для создания рандомного имени, можно воспользоваться одной из функций генерации хеша, к примеру sha1 или md5. Или же оставить эту задачу серверу сообщений. Если при объявлении очереди не устанавливать ей имя, то RabbitMQ сам задаст рандомное имя очереди. Для возможности автоматического удаления очереди, при ее создании нужно задать флаги AMQP_IFUNUSED, AMQP_AUTODELETE.

Связывание (bindings)

Мы уже создали обменник с типо fanout и очередь. Теперь нужно сказать обменнику, что он должен публиковать сообщения имеено в эту очередь. Это отношение называется связыванием (binding)

Здесь второй параметр – ключ, по которому связывается обменник и очередь. В данном случае он может быть любой строкой, поскольку его значение игнорируется в случае, если обменник имеет тип fanout.

Все вместе

Поскольку в данном примере мы имеем дело с анонимными очередями, создаваться они должны на стороне консьюмера, и консьюмер, соответственно, должен быть запущен первым. Из продюсера создание очередей удаляется.

Селективная рассылка

В предыдущем уроке была рассмотрена возможность отсылки сообщений нескольким получателям.В данном уроке мы рассмотрим как отсылать сообщения в четко определенные очереди. Такая возможность может понадобиться, к примеру, если мы не хотим сохранять все сообщения на диске, а только критический. В то время как на экран будут выводиться все сообщения.

Связывание (bindings)

Связываение уже упоминалось в предыдущем уроке

Повторимся, оно нужно, чтобы сказать обменнику, что он должен публиковать сообщения имеено в эту очередь. В методе bind() имеется второй параметр – ключ(routingKey), по которому связывается обменник и очередь. В данном уроке он будет играть основную роль. Стоит также напомнить, что ключ напрямую зависит от типа обменника. Так для обменника с типом fanout, он просто игнорируется. К примеру, если нужно связать обменник и очередь по ключу ‘failure_messages’

Прямое связывание (точка-точка)

В предыдущем уроке система логирования выполняла широковещательную рассылку всем консьюмерам. Теперь мы хотим расширить это поведение путем добавления фильтра сообщений по их важности. К примеру, критические ошибки писать на диск, а предупреждения только выводить на экран с целью экономии дискового пространтсва. Ранее мы использовали обменник с типом fanout, который не позволяет это сделать. Сейчас мы используем другой тип обменника – direct, который отправляет сообщения только тем очередям, routingKey которых совпадает с routingKey сообщения. Это поведение проиллюстрировано на изображении

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

На изображении можно видеть обменник X с типом direct, который связан с очередью Q1 по ключу failure, и с очередью Q2 по ключам notice и warning. В данном случае все сообщения с ключем failure будут отсылаться только в очередь Q1, а все сообщения с ключами notice и warning будут отсылаться в очередь Q2. Сообщения, ключи которые не совпадают с выше указанными, будут игнорироваться всеми очередями.

Множественная связь

Вполне возможно несколько очередей связать с обменником по одному и тому же ключу. Т.е. для нашего примера мы вполне можем установить связь по ключу notice между обменником и очередью Q1 и между обменником и очередью Q2. В таком случае сообщения с ключем notice будут отсылаться на обе очереди, т.е. получаем поведение аналогичное обменнику с типом fanout.

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Отправка сообщений

Для отправки сообщений способом точка-точка обменник должен быть создан с типом direct, который сооветствует константе AMQP_EX_TYPE_DIRECT.

После чего возможна публикация сообщений по ключу

Получение сообщений Получение сообщений ничем не отличается от предыдущего урока, за исключением того, что нам нужно связать обменник с очередью по каждому типу

Все вместе

Как и в предыдущем уроке, поскольку в данном примере мы имеем дело с анонимными очередями, создаваться они должны на стороне консьюмера, и консьюмер, соответственно, должен быть запущен первым. Из продюсера создание очередей удаляется.

Рассылка по шаблону

В предыдущем уроке мы улучшили нашу систему логирования путем использования обменников с типом direct, создав возможность получать сообщения выборочно. Следующим этапом будет создание системы, позволяющей логировать по множеству критериев. Допустим мы хотим разделить обработаку логирования основываясь не только на важности сообщений, но и по устройствам, вызвавшим эту ошибку. Это предоставило бы нам большую гибкость – к примеру, можно было бы выделить обработку логов для критических ошибок, инициированных кроном, и отдельно выделить обработку логов всех сообщений от ядра системы. Для имплементации такой возможности нам неоходимо нечто большее, чем прямая рассылка сообщений (рассылка по методу точка-точка). Связывание по шаблону

Для выполнения связи по шалбону обменник должен иметь тип topic, который определяется константой AMQP_EX_TYPE_TOPIC. Ключи routingKey составляются из слова, следующих через точку, например, “logs.devices.kernel.notice”, “logs.devices.cron”. Максимальная длина такого ключа может составлять 255 символов. Логика доставки сообщений по ключу схожа с логикой для обменников с типом direct – сообщения с определенным ключем будут доставлены в очереди с соответствующим ключем. Но есть одна большая разница. Ключи, используемые для связи по шаблону, могут содержать два специальных символа:

Например, имеем следующие связи
*.orange.*
*.*.rabbit
lazy.#

rabit mq для чего используют. Смотреть фото rabit mq для чего используют. Смотреть картинку rabit mq для чего используют. Картинка про rabit mq для чего используют. Фото rabit mq для чего используют

Первое слово описывает скорость, второе – цвет и третье – вид животного, т.е. [speed][color][species]. Мы создали три связи: очередь Q1 связали по ключу “.orange.” и очередь Q2 – по ключам “..rabbit” и “lazy.#”. Таким образом, можно сказать, что очередь Q1 рассматривает всех оранжевых животных, а очередь Q2 – всех зайцев и всех медленных животных.

Рассмотрим несколько примеров:

Обменник с типом topic может повторять поведение обменника с типом fanout, если с ним связать очередь по ключу “#”. Если в ключе не испльзовать специальных символов, то такой обменник будет соответствовать обменнику с типом direct.

Отправка сообщений

Для отправки сообщений по шаблону обменник должен быть создан с типом topic, который сооветствует константе AMQP_EX_TYPE_TOPIC.

После чего возможна публикация сообщений по ключу

Получение сообщений

Получение сообщений ничем не отличается от предыдущего урока

Все вместе

Как и в предыдущем уроке, поскольку в данном примере мы имеем дело с анонимными очередями, создаваться они должны на стороне консьюмера, и консьюмер, соответственно, должен быть запущен первым. Из продюсера создание очередей удаляется.

Реализация RPC шаблона

Во втором уроке была реализована очередь, которая распределяла нагрузку между всеми имеющимися консьюмерами. Но, что если нам нужно получить результат от обработчика очереди. Такой подход известен как вызов удаленных процедур или RPC(remote procedure call). В этом уроке будет реализована модель RPC с использованием очереди сообщений RabbitMQ. Конечно, такой подход предполагает, что обработка не должна занимать много времени. Для реализации примера наша функция обработчик будет изменять сообщение “message before” на “message after”.

В целом, реализация RPC посредством RabbitMQ довольно проста. Клиент отправляет сообщение, а сервере отвечает. Для обработки ответа сервера, необходимо создать callback очередь. Чтобы узнать какая callback очередь ожидает ответа, мы должны в запросе послать ее имя. Для этого на продюсере создается анонимная очередь и ее имя добавляется в параметры запроса

Обратите внимание, что callback очередь создается с флагом AMQP_EXCLUSIVE, что означает, что только один консьюмер может слушать эту очередь.

Correlation ID

В методе, представленном выше, мы предполагаем создавать callback очередь для каждого RPC запроса. Поскольку нельзя однозначно по имени очереди определить какому запросу принадлежит ответ, в запрос также добавляется параметр correlationId, который имеет уникальное значение для каждого запроса. Позже, когда мы получим ответ, мы сможем сравнить его correlationId со значением, переданным вместе с запросом. И в случае их несовпадения просто отбросить полученный ответ.

Итоговый план действий

Все вместе

Функция обработки сообщения на стороне сервера выглядит следующим образом

Функция обработки сообщения на стороне клиента

Источник

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *