rabbitmq что это такое
RabbitMQ. Часть 1. Introduction. Erlang, AMQP
Добрый день, Хабр! Хочу поделиться учебником-справочником знаний, которые мне удалось собрать по RabbitMQ и сжать в короткие рекомендации и выводы.
Оглавление
Кратко про AMQP
AMQP (Advanced Message Queuing Protocol) — открытый протокол для передачи сообщений между компонентами системы. Основная идея состоит в том, что отдельные подсистемы (или независимые приложения) могут обмениваться произвольным образом сообщениями через AMQP-брокер, который осуществляет маршрутизацию, возможно гарантирует доставку, распределение потоков данных, подписку на нужные типы сообщений.
Протокол AMQP вводит три понятия:
Протокол работает поверх TCP/IP.
Кратко про Erlang
Исходный код проекта находится в репозитории на GitHub. Архитектура RabbitMQ-server основана на Erlang и BEAM.
Кратко про RabbitMQ
Основная идея модели обмена сообщениями в RabbitMQ заключается в том, что producer (издатель) не отправляет сообщения непосредственно в очередь. На самом деле и довольно часто издатель даже не знает, будет ли сообщение вообще доставлено в какую-либо очередь.
Вместо этого издатель может отправлять сообщения только на обмен. С одной стороны, обмен получает сообщения от издателей, а с другой — отправляет их в очереди. Обмен должен точно знать, что делать с полученным сообщением. Должно ли оно быть добавлено в определенную очередь? Должно ли оно быть добавлено в несколько очередей? Или сообщение нужно игнорировать.
Кратко работу RabbitMQ можно описать следующим образом:
Подключение и каналы
Для такого обмена информацией между клиентом и сервером используются каналы. Каналы создаются в рамках определенного подключения. Каждый канал изолирован от других каналов. В синхронном случае не возможно выполнять следующую команду, пока не получен ответ.
Для того чтобы иметь возможность отправлять команды параллельно приходится открывать несколько каналов. Каждый канал создает отдельный Erlang процесс. Одно подключение может иметь множество каналов (multiplexing). Для каждого канала существуют некие структуры и объекты в памяти. Поэтому чем больше каналов имеется в рамках соединения, тем больше памяти использует RabbitMQ для управления таким соединением.
Простой пример создания подключения и канала при помощи RabbitMQ.Client:
Открывать новое соединение для каждой операции, настоятельно не рекомендуется, поскольку это приведет к большим затратам. Каналы также должны быть постоянными, но многие ошибки протокола приводят к закрытию канала, поэтому срок службы канала может быть короче, чем у соединения.
Где используется RabbitMQ?
В контексте микросервисов протокол AMQP и его реализацию в RabbitMQ часто используют для асинхронного взаимодействия между сервисами.
В контексте IIOT протокол AMQP и его реализацию в RabbitMQ используют для обмена данными между серверами (сервер-сервер). Также используют плагин MQTT Plugin RabbitMQ являющегося реализацией протокола MQTT для передачи данных между датчиком и сервером в низкоскоростных средах с высокой задержкой (полный перечень поддерживаемых протоколов перечислен на сайте проекта).
В следующей статье начнем разбираться подробнее с Exchanges.
Что такое RabbitMQ, зачем он нужен и как его использовать
Около полугода назад на одном проекте мы с напарником столкнулись с проблемой масштабирования, которая в тот момент внезапно ударила по серверу и весело его уронила. Количество задач, которые ставили пользователи, превысило барьеры вычислительных мощностей. Факторов, которые к этому привели, было несколько:
Проблема была решена — был арендован второй сервер, куда были перенесены соответствующие скрипты и с помощью небольшого API делегирована часть задач. Но в момент, когда мы искали решение и думали, что же лучше: по запросу передавать JSON-массив или что-то другое, нашли много интересной информации. В частности google выдавал что-то про «брокеры сообщений», «очереди», какого-то кролика… Причём тут кролик я тогда не понял, чтиво про брокеры — бросил, подумав, что это слишком сложно. Но через некоторое время появилась задача делегировать уже другие данные на ещё один сервер, и тут, на одном из форумов, мне уже явно посоветовали покурить в сторону злополучного кролика — RabbitMQ. К слову, задачу я решил, RabbitMQ оказался не таким и сложным (вернее — его конфигурация), а решение моей конкретной задачи заняло весьма немного кода. Так что же такое RabbitMQ?
RabbitMQ — это платформа позволяющая обмениваться сообщениями. Что может быть в сообщении — решать только тебе. Обмениваться можно как на одном сервере, так и с одного на другой. Это отличный способ масштабирования, так как с хорошо настроенным RabbitMQ мы можем просто подключать новые сервера, настроив на них нужное ПО и прописав конфигурацию, а RabbitMQ будет сам делегировать работу между всеми серверами. На официальном сайте есть мануалы по основным способам использования кролика — ознакомься. Я не буду описывать основные сущности, которые есть в RabbitMQ и на основе который строится протокол AMQP — это можно легко найти в сети.
Я сегодня рассмотрю, как интегрировать RabbitMQ в Symfony 4.1 на примере двух серверов — один будет отправлять сообщения, а другой получать. Какая актуальность данной статьи, если подобные маны уже есть? Ну по-первых, она на русском. Во-вторых, есть несколько ошибок, которые крайне сложно загуглить и решаются они методом научного тыка — вот их-то я и опишу.
У нас есть 2 сервера (хотя все примеры можно проделать и на одном, честно говоря) — один собирает задачи, другой их выполняет. Задачи могут быть любые — на твой вкус и цвет: от отправки email-сообщений, до обработки изображений. Можешь просто выводить данные на экран. Итак, на одном сервере у тебя идёт постановка задач, а на другом — обработка этих задач.
Сначала я показываю все действия на сервере, который собирает задачи.
Для начала — нужно установить бандл, который является обёрткой над библиотекой, реализующей протокол AMQP:
Ошибок быть не должно, а если и будут — внимательно читай вывод и ставь библиотеки, которые будут указаны после слов, похожих на слово require. Или пиши в комментариях — я с радостью помогу.
Дальше нужно установить rabbitmq-server. Да, у кролика есть свой полноценный сервер, который разворачивается на одном из серверов, если мы рассматриваем обмен между двумя серверами. Тут нужно подумать, что у тебя будет сервером, а что клиентом. Так как обрабатывает задачи (а значит и выбрасывает их в очередь) сервер, с которым мы сейчас работаем — будет логично установить rabbitmq-server на него Если я тут не прав — пиши в комментариях. Ставим:
И тут у тебя может появится первая проблема, об которую ты не слабо разобьёшь лоб:
rabbitmq/bin/rabbitmq-server-wait (code=exited, status=70)
Спустя почти час гугления я пришёл к выводу, что rabbitmq-server не запускается, собака такая, если в файле /etc/hostname у тебя прописано имя с точками. А мне поставили сервер, где в этом имени были точки… Точки эта штука распознавать не умеет.
Теперь добавляем пользователя и выдаём ему права, с помощью rabbitmqcli:
Первая строка — добавляем пользователя и устанавливаем ему пароль. Главное, измени username и password на свои данные. Далее ставим пользователю категорию «администратор». Затем, выдаём доступ к просмотру всего и вся. Я записал эти параметры в файл config/services.yaml:
Теперь нужно добавить сущность, с которой будет общаться rabbitmq-server — виртуальный хост, через который будет настраиваться конкретный обмен между конкретным постановщиком задач — producer (пишет в очередь) и потребителем задач — consumer (читает из очереди). Я не знаю почему, но в мануалах по интеграции Symfony и RabbitMQ этот пункт пропускают, а искать его самому и думать в чём ошибка — это дополнительное время. Добавляем:
username — это тот самый пользователь, созданный выше. my_project — имя хоста. Добавляем в параметры:
Идём дальше. А дальше нам нужно настроить конфиги. У тебя должен был появиться файл config/packages/old_sound_rabbit_mq.yaml — открывай его и добавляй:
Давай разберёмся. Я надеюсь, что ты ознакомился с примерами на официальном сайте и знаешь, что producer — публикует сообщения, exchange — определяет, в какую (если их несколько) очередь сообщение выбросить. port у rabbitmq по-умолчанию 5672 — таким и оставляем. Теперь нам нужно создать exchange, делаем это с помощью команды:
Всё успешно? Отлично, мы настроили rabbitmq на сервере, который будет писать в очередь. У кролика есть крутой интерфейс — зайди на ip сервера через порт 15672, увидишь нечто следующее:
Входи под данными, которые были созданы чуть выше и увидишь графики, метрики и прочую интересную штуку.
Теперь переходим на сервер, который будет читать сообщения. Тут всё гораздо проще. Сначала ставь бандл, затем открывай файл конфига и пиши:
Ты уже понял, что параметры нужно определить в config/services.yaml в секции параметры. Имя пользователя и пароль указывай те, который были созданы выше. ip_main_server — это ip сервера, с которым мы работали выше.
Далее настраивается consumer. Параметр exchange_options имеет значение, которые мы создавали выше.
Параметр queue_options — это имя очереди. Можешь задать любое, она будет создана автоматически.
callback — это имя сервиса, который будет вызывать класс, обрабатывающий каждое новое сообщение из очереди. Давай его определим. Иди в файл config/services.yaml и добавляй:
Как ты догадался, нужно создать этот класс. Создаётся он по определённому шаблону — его нужно унаследовать от класса Command, который нам любезно предоставляет RabbitMQ Bundle.
Просто выводим тело письма, которое пришло. Я тестировал на массивах, поэтому — print_r. Теперь запускаем consumer в режим ожидания сообщений:
И теперь с сервера, где мы устанавливали producer, отправим первое сообщение. Создаём простой контроллер:
Переходи на /check_data_test и наслаждайся — ты отправил и получил первое сообщение
Читайте также
Я в своей жизни ещё ни разу не встречал проекта, где бы всё было сделано по правилам проектирования архитектуры, с…
Периодически при работе с консолью Linux нужно пронаблюдать за изменением вывода какой-либо команды с промежутком в несколько секунд. Для этого…
Как регламентировать перекуры в течение рабочего дня? Можно ли разрешать опаздывать к началу рабочего дня? Можно ли чатится во время…
1 Comment
andre · 2021-06-21 at 16:00
Ajaxblog
RabbitMQ для начинающих
Иногда в веб-приложениях появляется необходимость выполнить сложные ресурсоемкие задачи, которые не могут быть умещены в коротком временном интервале HTTP запроса. В этом случае на помощь приходят очереди. Основная идея очередей – избежать выполнения ресурсоемких задач непосредственно после отправки запроса. Вместо этого задача ставится в очередь для последующего выполнения в асинхронном режиме. Т.е. при получении запроса от клиента мы инкапсулируем задачу как сообщение и отправляем его в очередь, а уже обработчик очереди достает сообщения в порядке их следования и обрабатывает надлежащим образом. Забегая вперед, скажу, что возможен режим работы очередей, когда при наличии нескольких копий обработчика, следующая задач будет поступать на свободный обработчик. Таким образом достигается распараллеливание выполнения задач.
В данном разделе рассматривается работа с очередями, использующими сервер сообщений RabbitMQ. Сервер RabbitMQ по сути является менеджером очередей, который имеет следующие преимущества:
В туториалах будут приведены примеры для всех вышеперечисленных вариантов. За основу взяты туториалы с официального сайта, дополнены и реализованы на PHP для RabbitMQ.
RabbitMQ испозует протокол AMQP. Чтобы использовть RabbitMQ необходимо поставить клиентскую и серверную части.
Установка сервера
Для установки расширения AMQP для PHP необходимо сначала установить RabbitMQ Server
Добавим следующию строку в файл /etc/apt/sources.list
Установка клиента
Выбираем нужную библиотеку и устанавливаем http://www.rabbitmq.com/devtools.html. Наиболее популярны php-amqplib и PECL AMQP library
Базовые понятия
В RabbitMQ используются следующий обозначения. Продюсер – программа, которая посылает сообщения. Будем обозначать его так
Брокер(очередь) – собственно просто буфер в памяти без каких-либо ограничений на количество хранимых сообщений. В одну и ту же очередь могут отсылать сообщения несколько продюсеров, так же как несколько консьюмеров могут пытаться получить сообщения из одной и той же очереди. Очередь будет обозначена так(сверху указано имя очереди)
Консьюмер(получатель) – программа, которая принимает сообщения из очереди. Будем обозначать его так
Здесь важно отметить, что продюсер, консьюмер и брокер могут быть расположены на различных машинах, более того, в большинстве случаев это именно так.
Первый скрипт работы с очередью, своего рода “Hello world”, будет отсылать текстовое сообщение с клиента, принимать его на сервере и выводить на экран.
Т.е. схема работы следующая: Первое, что надо сделать, это установить соединение с сервером RabbitMQ. Соединение устанавливается командами
это дефолтные значения. Если достаточно дефолтного значения любого из этих параметров, то его можно опустить. И, напротив, если, к примеру, нужно подключиться к другой машине, в параметре host необходимо указать ее имя или ip адрес.
Используя коннект можно получить объект для канала
На основе полученного канала создаем обменник
и, собственно, саму очередь
Когда обменник и очередь готовы, их можно связать по ключу
После того как сообщение отослано, коннект можно разорвать.
Получатель также должен выполнить ту же последовательность – приконнектиться к серверу сообщений; – создать канал; – объявить обменник; – объявить очередь; – связать очередь с обменником по ключу Последние два действия, как упоминалось выше, не обязательны. Теперь можно начать прослушивать очередь
Здесь методу get в качетсве параметра передается константа ARMQ_AUTOACK, которая оповещает сервер сообщений о том, что данное сообщение получено. Это самый простой способ удалить сообщение из очереди. Однако в данном случае в случае неудачной обработки сообщения, вернуть повторно его в очередь нельзя.
Таким образом, получаем два скрипта
Распределенные очереди
Основная идея заключается в следующем. Допустим нужно обработать видео файл, чтобы получить на выходе три сконвертированных файла в различные форматы, информацию о метаданных и создать иконки для этого видеофайла. Т.е. получаем 5 задач, три из которых довольно тяжеловесные(конвертация), одна легкая(получение метаданных) и одна средняя(создание иконок). При этом все эти задачи являются независимыми друг от друга. Таким образом, можно выполнять их одновременно, т.е. распараллелить обработку очереди на уровне сообщений(пункт 2). Для этого при объявлении обменника необходимо установить ему тип AMQP_EX_TYPE_FANOUT. Тогда все сообщения, посылаемые в указанный обменник, независимо от имени очереди и ключа роутера, будут прослушиваться всеми запущенными копиями консьюмера. Т.е. каждое следующее сообщение будет отсылаться на следующий свободный консьюмер. В нашем случае их должно быть пять. Такой способ обработки получил название round-robin dispathing. Обратите внимание, что при отправке продюсером и при получении консьюмером используется одна и та же очередь.
Оповещение (acknowledgment)
Некоторые задачи могут выполняться довольно долго. И неизвестно, что может произойти с сервером в этот момент: сервер может перезагрузиться, либо задача может зависнуть или завершится фатальной ошибкой. В первом туториале оповещение было отключено путем передачи параметра AMQP_AUTOACK в метод get(). В этом случае сообщения удаляются из памяти сразу после выполнения метода get и в случае ошибки, случившейся во время обработки, не вернутся в очередь. Чтобы избежать этого, не будем передавать константу AMQP_AUTOACK в метод get. Вместо этого по завершению обработки вызовем метод ack(), который уведомит брокер о том, что сообщение успешно обработано и его можно удалить из памяти. В противном случае RabbitMQ понимает, что сообщение не обратботано и перенаправляет его другому свободному консьюмеру. Однако здесь стоит отметить один важный момент. Перенаправленные сообщения не будут обрабатываться до того пока консьюмер не отконнектится и приконнектится заново к брокеру. Если необходимо заново обработать сообщение в рамках того же коннекта к серверу сообщений, то необходимо вызвать метод nack() с флагом AMQP_REQUEUE, который поставит неудачно обработанную задачу обратно в очередь и уведомит брокер о том, что эта задача должна быть вновь обработана.
Распростаненная ошибка – при включенном оповещении не подтверждать корректно обработанные задачи(сообщения). В этом случае при каждом новом коннекте, все уже обработанные задачи будут поступать заново на обработку. Процесс будет выглядеть как беспорядочная повторная отпарка сообщений, что в конечном итоге приведет к переполнению памяти. Отследить такую ситуацию можно путем использования нативного инструмента сервера сообщений rabbitmqctl
Жизнеспособность сообщений (durability)
В предыдущем параграфе мы рассмотрели как не потерять сообщение в очереди путем повторной отправки его в очередь. Тем не менее сообщение может быть потеряно в случае если сервер сообщений был неожиданно остановлен. Чтобы этого избежать, очередь должна быть создана с флагом AMQP_DURABLE.
Если очередь ‘hello’ уже была объявлена, то данный код вызовет ошибку, поскольку один раз объявленную очередь нельзя объявить повторно с другими параметрами. Из этой ситуации есть два выхода, либо обнулить все очереди как сказано здесь, либо создать новую очередь с неиспользуемым именем. Посмотреть список очередей можно спопособом упомянутым в предыдущем параграфе. Установка флага AMQP_DURABLE не гарантирует стопроцентную сохранность сообщений в очереди. Несмотря на то, что таким спопосбом мы указываем RabbitMQ сохранять сообщения на диске, существует мертвая зона после получения соощения, когда оно уже в памяти, но еще не сохранено на диске. В этот момент, в случае не предвиденной ситуации, оно может быть утеряно из памяти. Для нашего простого примера таких гарантий достаточно, но если необходимо добиться высоких гарантий получения сообщения, то следует использовать транзакции.
Все вместе
Для примера распределения сообщений между очередями нам понадобится функция, имитирующая загруженность системы. Для этого мы используем обычный таймер
Полный код продюсера (send.php) будет выглядеть так
Обратите внимание, что очереди создаются с абсолютно идентичными параметрами. Как уже было сказано, повторное создание очереди с иными параметрами вызовет исключение. Также стоит отметить, что если вы уверены, что консьюмер будет запускаться первым, то создание очереди в продюсере не обязательно.
Теперь, если запустить несколько копий консьюмера, то можно будет видеть как между ними распределяются сообщения. Предположим, что мы отправили 8 сообщений в очередь, т.е. запустили скрипт send.php 8 раз. После этого запускаем в двух разных терминалах по консьюмеру
Вывод в первом терминале
Вывод во втором терминале
Как видно сообщения распределились по мере нагрузки консьюмера.
Рассылка публикаций
В предыдущем уроке мы распределяли сообщения между всеми консьюмерами. В данном уроке, наоборот, будем отсылать все сообщения из очереди на все консьюмеры. Такой шаблон известен как “публичная рассылка”(publish subscribe). Такое поведение может быть полезно, к примеру, при создании логирования с одновременным выводом сообщения в терминал. Т.е. один консьюмер получает сообщение и сохраняет его на диска, в то время как другой выводит это сообщение на экране.
В предыдущих разделах мы не заостряли внимание на обменнике(exchanger). На самом деле продюсер никогда не отправляет сообщения непосредственно в очередь. Он размещает их в обменнике. Собственно говоря, продюсер и не знает было ли сообщение доставлено в очередь или нет. Обменник представляет собой простую вещь – он получает сообщения от продюсера и отправляет(публикует) их в очередь. При этом обменник четко знает по какому алгоритму он работает:
В нашем примере будем использовть тип обменника fanout.
Для этой цели продюсер не создает именованную очередь. Консьюмер же, в свою очередь, создает анонимную очередь, в которую принимает сообщения продюсера. При таком подходе каждый консьюмер будет принимать все сообщения продюсера.
Анонимные очереди
В предыдущем уроке у нас была необходимость рассылки сообщений в очереди с одинаковыми именами для возможности распределения сообщений между продюсерами и консьюмерами. Для достижения же текущей цели нам нужны выполнить две вещи. Во-первых, нам нужны очереди с различными именами. Во-вторых, созданные очереди должны автоматически удаляться после окончания работы скрипта. Для создания рандомного имени, можно воспользоваться одной из функций генерации хеша, к примеру sha1 или md5. Или же оставить эту задачу серверу сообщений. Если при объявлении очереди не устанавливать ей имя, то RabbitMQ сам задаст рандомное имя очереди. Для возможности автоматического удаления очереди, при ее создании нужно задать флаги AMQP_IFUNUSED, AMQP_AUTODELETE.
Связывание (bindings)
Мы уже создали обменник с типо fanout и очередь. Теперь нужно сказать обменнику, что он должен публиковать сообщения имеено в эту очередь. Это отношение называется связыванием (binding)
Здесь второй параметр – ключ, по которому связывается обменник и очередь. В данном случае он может быть любой строкой, поскольку его значение игнорируется в случае, если обменник имеет тип fanout.
Все вместе
Поскольку в данном примере мы имеем дело с анонимными очередями, создаваться они должны на стороне консьюмера, и консьюмер, соответственно, должен быть запущен первым. Из продюсера создание очередей удаляется.
Селективная рассылка
В предыдущем уроке была рассмотрена возможность отсылки сообщений нескольким получателям.В данном уроке мы рассмотрим как отсылать сообщения в четко определенные очереди. Такая возможность может понадобиться, к примеру, если мы не хотим сохранять все сообщения на диске, а только критический. В то время как на экран будут выводиться все сообщения.
Связывание (bindings)
Связываение уже упоминалось в предыдущем уроке
Повторимся, оно нужно, чтобы сказать обменнику, что он должен публиковать сообщения имеено в эту очередь. В методе bind() имеется второй параметр – ключ(routingKey), по которому связывается обменник и очередь. В данном уроке он будет играть основную роль. Стоит также напомнить, что ключ напрямую зависит от типа обменника. Так для обменника с типом fanout, он просто игнорируется. К примеру, если нужно связать обменник и очередь по ключу ‘failure_messages’
Прямое связывание (точка-точка)
В предыдущем уроке система логирования выполняла широковещательную рассылку всем консьюмерам. Теперь мы хотим расширить это поведение путем добавления фильтра сообщений по их важности. К примеру, критические ошибки писать на диск, а предупреждения только выводить на экран с целью экономии дискового пространтсва. Ранее мы использовали обменник с типом fanout, который не позволяет это сделать. Сейчас мы используем другой тип обменника – direct, который отправляет сообщения только тем очередям, routingKey которых совпадает с routingKey сообщения. Это поведение проиллюстрировано на изображении
На изображении можно видеть обменник X с типом direct, который связан с очередью Q1 по ключу failure, и с очередью Q2 по ключам notice и warning. В данном случае все сообщения с ключем failure будут отсылаться только в очередь Q1, а все сообщения с ключами notice и warning будут отсылаться в очередь Q2. Сообщения, ключи которые не совпадают с выше указанными, будут игнорироваться всеми очередями.
Множественная связь
Вполне возможно несколько очередей связать с обменником по одному и тому же ключу. Т.е. для нашего примера мы вполне можем установить связь по ключу notice между обменником и очередью Q1 и между обменником и очередью Q2. В таком случае сообщения с ключем notice будут отсылаться на обе очереди, т.е. получаем поведение аналогичное обменнику с типом fanout.
Отправка сообщений
Для отправки сообщений способом точка-точка обменник должен быть создан с типом direct, который сооветствует константе AMQP_EX_TYPE_DIRECT.
После чего возможна публикация сообщений по ключу
Получение сообщений Получение сообщений ничем не отличается от предыдущего урока, за исключением того, что нам нужно связать обменник с очередью по каждому типу
Все вместе
Как и в предыдущем уроке, поскольку в данном примере мы имеем дело с анонимными очередями, создаваться они должны на стороне консьюмера, и консьюмер, соответственно, должен быть запущен первым. Из продюсера создание очередей удаляется.
Рассылка по шаблону
В предыдущем уроке мы улучшили нашу систему логирования путем использования обменников с типом direct, создав возможность получать сообщения выборочно. Следующим этапом будет создание системы, позволяющей логировать по множеству критериев. Допустим мы хотим разделить обработаку логирования основываясь не только на важности сообщений, но и по устройствам, вызвавшим эту ошибку. Это предоставило бы нам большую гибкость – к примеру, можно было бы выделить обработку логов для критических ошибок, инициированных кроном, и отдельно выделить обработку логов всех сообщений от ядра системы. Для имплементации такой возможности нам неоходимо нечто большее, чем прямая рассылка сообщений (рассылка по методу точка-точка). Связывание по шаблону
Для выполнения связи по шалбону обменник должен иметь тип topic, который определяется константой AMQP_EX_TYPE_TOPIC. Ключи routingKey составляются из слова, следующих через точку, например, “logs.devices.kernel.notice”, “logs.devices.cron”. Максимальная длина такого ключа может составлять 255 символов. Логика доставки сообщений по ключу схожа с логикой для обменников с типом direct – сообщения с определенным ключем будут доставлены в очереди с соответствующим ключем. Но есть одна большая разница. Ключи, используемые для связи по шаблону, могут содержать два специальных символа:
Например, имеем следующие связи
*.orange.*
*.*.rabbit
lazy.#
Первое слово описывает скорость, второе – цвет и третье – вид животного, т.е. [speed][color][species]. Мы создали три связи: очередь Q1 связали по ключу “.orange.” и очередь Q2 – по ключам “..rabbit” и “lazy.#”. Таким образом, можно сказать, что очередь Q1 рассматривает всех оранжевых животных, а очередь Q2 – всех зайцев и всех медленных животных.
Рассмотрим несколько примеров:
Обменник с типом topic может повторять поведение обменника с типом fanout, если с ним связать очередь по ключу “#”. Если в ключе не испльзовать специальных символов, то такой обменник будет соответствовать обменнику с типом direct.
Отправка сообщений
Для отправки сообщений по шаблону обменник должен быть создан с типом topic, который сооветствует константе AMQP_EX_TYPE_TOPIC.
После чего возможна публикация сообщений по ключу
Получение сообщений
Получение сообщений ничем не отличается от предыдущего урока
Все вместе
Как и в предыдущем уроке, поскольку в данном примере мы имеем дело с анонимными очередями, создаваться они должны на стороне консьюмера, и консьюмер, соответственно, должен быть запущен первым. Из продюсера создание очередей удаляется.
Реализация RPC шаблона
Во втором уроке была реализована очередь, которая распределяла нагрузку между всеми имеющимися консьюмерами. Но, что если нам нужно получить результат от обработчика очереди. Такой подход известен как вызов удаленных процедур или RPC(remote procedure call). В этом уроке будет реализована модель RPC с использованием очереди сообщений RabbitMQ. Конечно, такой подход предполагает, что обработка не должна занимать много времени. Для реализации примера наша функция обработчик будет изменять сообщение “message before” на “message after”.
В целом, реализация RPC посредством RabbitMQ довольно проста. Клиент отправляет сообщение, а сервере отвечает. Для обработки ответа сервера, необходимо создать callback очередь. Чтобы узнать какая callback очередь ожидает ответа, мы должны в запросе послать ее имя. Для этого на продюсере создается анонимная очередь и ее имя добавляется в параметры запроса
Обратите внимание, что callback очередь создается с флагом AMQP_EXCLUSIVE, что означает, что только один консьюмер может слушать эту очередь.
Correlation ID
В методе, представленном выше, мы предполагаем создавать callback очередь для каждого RPC запроса. Поскольку нельзя однозначно по имени очереди определить какому запросу принадлежит ответ, в запрос также добавляется параметр correlationId, который имеет уникальное значение для каждого запроса. Позже, когда мы получим ответ, мы сможем сравнить его correlationId со значением, переданным вместе с запросом. И в случае их несовпадения просто отбросить полученный ответ.
Итоговый план действий
Все вместе
Функция обработки сообщения на стороне сервера выглядит следующим образом
Функция обработки сообщения на стороне клиента