Skip to content

MQ

MQ - Message Queque

Що таке MQ

Черги повідомлень, по суті, є сполучною ланкою між різними процесами у додатках та надають надійний та масштабований інтерфейс взаємодії з іншими підключеними системами та пристроями. Черга — структура даних з принципом доступу до елементів "перший прийшов — перший вийшов". Додавання елемента можливе лише в кінець черги, вибірка — лише з початку черги, при цьому вибраний елемент з черги видаляється.

Десять причин, чому черги повідомлень є життєво важливим компонентом для будь-якої архітектури чи додатка

  • Слабке зв'язування — черги повідомлень створюють неявні інтерфейси обміну даними, які дозволяють процесам бути незалежними одне від одного, тобто ви просто визначаєте формат повідомлень, відправлених від одного процесу іншому.
  • Надмірність — черги дозволяють уникнути випадків неефективного використання ресурсів процесу (наприклад, пам'яті) внаслідок зберігання невикористаної (надмірної) інформації.
  • Масштабованість — черги повідомлень дозволяють розподілити процеси обробки інформації. Таким чином, вони дозволяють легко збільшувати швидкість, з якою повідомлення додаються в чергу та обробляються.
  • Еластичність та можливість витримувати пікові навантаження — черги повідомлень можуть виконувати роль свого роду буфера для накопичення даних в разі пікового навантаження, згладжуючи таким чином навантаження на систему обробки інформації та запобігаючи її відмові.
  • Стійкість до відмов — черги повідомлень дозволяють розділити процеси один від одного, так що якщо процес, який обробляє повідомлення з черги, падає, повідомлення можуть бути додані в чергу для обробки пізніше, коли система відновиться.
  • Гарантована доставка — використання черг повідомлень гарантує, що повідомлення буде доставлено та оброблено в будь-якому випадку (поки є принаймні один обробник).
  • Гарантований порядок доставки — більшість систем черг повідомлень здатні забезпечити гарантії того, що дані будуть оброблятися у певному порядку (найчастіше у порядку їх надходження).
  • Буферизація — черги повідомлень дозволяють відправляти та отримувати повідомлення, при цьому працюючи з максимальною ефективністю, надаючи буферний шар — процес запису в чергу може відбуватися настільки швидко, наскільки це може робити черга повідомлень, а не обробник повідомлення.
  • Розуміння потоків даних — черги повідомлень дозволяють виявляти вузькі місця в потоках даних додатка, легко можна визначити, яка з черг заповнюється, яка не використовується, і визначити, що необхідно робити — додавати нових обробників повідомлень або оптимізувати поточну архітектуру.
  • Асинхронний зв'язок — черги повідомлень надають можливість асинхронної обробки даних, що дозволяє помістити повідомлення в чергу без обробки, дозволяючи системі обробити повідомлення пізніше, коли з'явиться можливість.

Які готові реалізації MQ ви знаєте

  • RabbitMQ: Це відкрите програмне забезпечення, яке реалізує стандартизований протокол AMQP (Advanced Message Queuing Protocol). Він дозволяє надійно та масштабовано обмінюватися повідомленнями між різними системами та компонентами додатків.
  • Apache Kafka: Це розподілена платформа для потокової обробки та зберігання даних. Він спроектований для обробки великої кількості подій (повідомлень) та забезпечує гарантовану доставку та зберігання цих подій.
  • Redis: Redis може бути використаний як MQ за допомогою свого механізму публікації-підписки (pub-sub) та черги повідомлень (list). Він швидкий та легкий у використанні.
  • Apache ActiveMQ: Це ще одна реалізація протоколу AMQP. Він надає різноманітні можливості обробки повідомлень, включаючи публікацію-підписку, черги повідомлень та багато іншого.
  • ZeroMQ (0MQ): Це бібліотека для передачі повідомлень між процесами через різні протоколи. Вона дозволяє вам побудувати власні механізми обміну повідомленнями, а також підходить для складних топологій зв'язку.
  • Apache RocketMQ: Це система повідомлень, яка забезпечує надійну та масштабовану передачу повідомлень між різними компонентами додатків.

Links

Який життєвий цикл повідомлення в MQ ?

  • Створення повідомлення: Відправник (producer) генерує повідомлення та відправляє його в чергу через брокер обміну або API сервісу.
  • Доставлення в чергу: Повідомлення додається до черги. У RabbitMQ це може бути зв'язано з маршрутизатором (exchange), який спрямовує повідомлення в одну чи кілька черг відповідно до правил маршрутизації.
  • Зберігання повідомлення: Повідомлення зберігається в черзі до моменту, коли споживач (consumer) зможе його обробити. Для RabbitMQ можна налаштувати TTL (time-to-live) для повідомлень. В AWS SQS можна використовувати Dead Letter Queue для обробки невдалих повідомлень.
  • Отримання повідомлення: Споживач зчитує повідомлення з черги. У RabbitMQ це відбувається шляхом підтвердження доставки (acknowledgment), що забезпечує надійність. В AWS SQS споживач блокує повідомлення (visibility timeout) для запобігання його повторному зчитуванню іншими споживачами.
  • Обробка повідомлення: Споживач виконує необхідну логіку для обробки повідомлення. Наприклад, це може бути виклик функції або запис даних у базу.
  • Видалення повідомлення: Після успішної обробки споживач підтверджує RabbitMQ або видаляє повідомлення з черги в AWS SQS. В Azure Service Bus використовується подібний підхід із механізмом підтвердження.
  • Помилки обробки: Якщо повідомлення не може бути оброблене, воно може бути відправлене в Dead Letter Queue (у AWS SQS та Azure Service Bus) або повернене в RabbitMQ для повторної обробки залежно від налаштувань.

Особливості роботи з Redis

Redis працює в одному процесі та в одному потоці, тобто він може обробляти один запит за один раз. Якщо спробувати вручну дістати всі значення, які зберігаються, то заблокується робота з іншими з'єднаннями.

Які структури даних є в Redis?

Redis має кілька вбудованих структур даних, кожна для своїх сценаріїв:

  • String - найпростіше значення (текст, число, JSON, бінарні дані). Використовується для токенів, прапорців, лічильників.
  • List - впорядкована колекція, підтримує швидкі операції додавання/видалення з обох кінців. Підходить для черг, логів, стеків.
  • Set - невпорядкована множина унікальних елементів. Підтримує перевірку наявності, об'єднання, перетин, різницю. Корисно для зберігання унікальних ID, тегів, підписників.
  • Sorted Set - як Set, але кожен елемент має числовий пріоритет (score), за яким сортується. Часто використовується для рейтингів і топів.
  • Hash - структура виду ключ → поле → значення (як словник). Підходить для об'єктів з кількома атрибутами (профілі користувачів).
  • Bitmap - масив бітів, де можна встановлювати і перевіряти окремі біти за індексом. Підходить для прапорців активності або відміток.
  • Stream - структура для логування подій у реальному часі. Підтримує консьюмери, групи і контроль доставки. Використовується для побудови черг і стрімінгу даних.

За рахунок чого RabbitMQ гарантує доставку повідомлень?

Доставка гарантується зв'язкою збереження на диск + підтвердження + реплікація + обробка помилок.

1. Стійкість (durability & persistence)

  • Черга позначається як durable=True - щоб не зникла при перезапуску брокера.
  • Повідомлення позначається як persistent (delivery_mode=2) - щоб RabbitMQ записав його на диск.

2. Підтвердження від продюсера (Publisher Confirms)

Продюсер може запросити підтвердження від брокера, що повідомлення прийняте і збережене. Якщо брокер не підтвердив доставку - продюсер може відправити повторно.

3. Підтвердження від консьюмера (Consumer Ack)

RabbitMQ чекає, щоб консьюмер підтвердив отримання (basic_ack). Поки ack немає:

  • повідомлення вважається необробленим;
  • якщо консьюмер падає - повідомлення повертається до черги.

Слід використовувати auto_ack=False, щоб не втрачати повідомлення при збої консьюмера.

4. Реплікація (HA)

За допомогою Mirrored queues або Quorum queues повідомлення копіюється на кілька нод - не загубиться при падінні однієї.

5. Обробка помилок (Dead Letter Exchange)

Повідомлення, які не вдалося обробити, можуть відправитися в DLX - на повторну обробку або аналіз.

Гарантії доставки в RabbitMQ

1. At most once (максимум один раз)

Повідомлення може бути втрачене, але ніколи не доставиться двічі.

  • Продюсер не чекає підтверджень.
  • Черга може бути нестійка (non-durable), повідомлення - не persistent.
  • Консьюмер не підтверджує отримання (auto_ack=True).

Підходить для: телеметрії, логів, коли втрата допустима.

2. At least once (мінімум один раз)

Повідомлення буде доставлено, але можливі дублікати.

  • Продюсер: publisher confirms.
  • Повідомлення: persistent.
  • Черга: durable.
  • Консьюмер: підтверджує вручну (ack), інакше повідомлення повертається.

Підходить для більшості надійних систем, де дублікати можна обробляти (наприклад, через ідемпотентність).

3. Exactly once (точно один раз)

RabbitMQ сам по собі не гарантує - реалізується на рівні застосунку через:

  • ідемпотентну обробку;
  • зовнішні транзакції (наприклад, з базою даних);
  • дедуплікацію за message_id.

Підходить для фінансових операцій, обліку, коли ні втрата, ні дублікат недопустимі.

Dead-letter exchange (DLX) в RabbitMQ

Dead-letter exchange - механізм, який перенаправляє необроблені або відхилені повідомлення в окрему чергу. Повідомлення може потрапити в DLX, якщо воно:

  • відхилене (nack/reject з requeue=False);
  • прострочене (вийшов TTL);
  • не помістилось у чергу (переповнення).

Щоб увімкнути DLX, у звичайної черги налаштовується параметр x-dead-letter-exchange, і "погані" повідомлення автоматично потрапляють до окремої черги для повторної обробки або аналізу помилок.

Пріоритизація повідомлень в RabbitMQ

RabbitMQ підтримує priority queue - повідомлення з вищим пріоритетом обробляються раніше, навіть якщо опубліковані пізніше.

Кроки:

  1. Створюємо чергу з максимальним пріоритетом - параметр x-max-priority.
  2. Публікуємо повідомлення з вказаним полем priority.

Особливості:

  • Пріоритети - цілі числа, більше = важливіше.
  • При однакових пріоритетах - FIFO.
  • Працює тільки в межах однієї черги.
  • Без x-max-priority поле priority ігнорується.
  • Підтримується pika, kombu, Celery.

Важливо: не варто робити багато рівнів - 5–10 достатньо. Велика кількість рівнів погіршує продуктивність через внутрішню переоцінку порядку.

Як працюють пріоритети між кількома чергами?

RabbitMQ не підтримує глобальний пріоритет між чергами - пріоритети не "переходять" від однієї черги до іншої. Якщо приоритетна черга стала порожньою, консьюмер не починає автоматично читати з "менш пріоритетної".

Правильні підходи:

  • Одна черга з x-max-priority - найпростіший варіант, якщо реально потрібен пріоритет.
  • На стороні консьюмера: спочатку зчитувати з пріоритетної, потім - зі звичайної (через basic.get або вибір черги). Але це дає лише імітацію - справжнього "переходу" пріоритету між чергами немає.

Розклад між чергами залежить від раунду/доступності консьюмерів, а не від пріоритету.

Як захиститися від непрочитаних/непідтверджених повідомлень?

1. Підтвердження доставки (ACK/NACK)

  • RabbitMQ: manual_ack=True в basic_consume. Після обробки - basic_ack(delivery_tag). При помилці - basic_nack() або basic_reject() з requeue=True/False.
  • Без ack повідомлення вважається "у повітрі" - якщо консьюмер впаде, воно повернеться до черги.

2. Message TTL + Dead Letter Exchange

Якщо повідомлення не оброблено за X часу або N разів не вдалося - відправляти його в окрему чергу через DLX для повторної обробки чи аналізу.

3. Ідемпотентність обробки

Повторна доставка - це норма при збоях. Обробка має бути ідемпотентною:

  • зберігаємо message_id у Redis/Postgres;
  • при повторі - не обробляємо дублікат.

Конфлікт двох повідомлень з однаковим ключем

Кейс: в чергу прилетіли два повідомлення, які змінюють дату народження одного користувача. Як зрозуміти, яке з них застосовувати?

Рішення: ідемпотентність + версійність або мітка часу.

Варіант 1: event_time / updated_at

У кожного повідомлення є поле event_time - коли було сформовано зміну (не коли прилетіло!). Консьюмер порівнює event_time повідомлення з поточним user.birthdate_updated_at у БД. Якщо event_time > current - застосовує, інакше - ігнорує.

# message_1: {"user_id": 1, "new_birthdate": "1990-01-01", "event_time": "2024-01-01T12:00:00"}
# message_2: {"user_id": 1, "new_birthdate": "1991-01-01", "event_time": "2024-01-01T12:00:05"}
# message_2 wins regardless of delivery order

Варіант 2: версія (version, revision, seq_id)

Кожне повідомлення несе номер версії. Консьюмер читає поточну версію в БД, застосовує лише якщо version > current_version.

# {"user_id": 1, "birthdate": "1990-01-01", "version": 3}
# {"user_id": 1, "birthdate": "1991-01-01", "version": 4}

Якщо не використати ні того, ні іншого - можлива "гонка" між застарілими і актуальними подіями: при ретраях, паралельних воркерах, довгій доставці.

Як Kafka працює під капотом?

Kafka - розподілена лог-орієнтована система повідомлень, спроектована для високої продуктивності, стійкості й обробки великих потоків даних у реальному часі.

1. Основні сутності

  • Producer - відправляє повідомлення в Kafka.
  • Consumer - читає повідомлення з Kafka.
  • Broker - сервер Kafka, що зберігає й передає дані.
  • Topic - логічна категорія/канал повідомлень.
  • Partition - фізичний сегмент усередині топіка (забезпечує масштабування).
  • Consumer Group - група консьюмерів, що спільно читають партиції.

2. Зберігання повідомлень (лог-файли)

Kafka зберігає повідомлення у файлах на диску як стійкий впорядкований лог:

  • Кожна партиція - це append-only лог-файл.
  • Нові повідомлення дописуються в кінець, старі не змінюються.
  • Дані передаються з диску з використанням zero-copy.

3. Чому Kafka швидка

  • Append-only запис - найшвидша операція на диску.
  • Zero-copy I/O - дані передаються з page cache у сокет, обходячи user-space.
  • Партиції дозволяють розпаралелювати обробку.

4. Підписка і зсуви (offsets)

  • У кожного консьюмера є offset - позиція в партиції.
  • Консьюмер сам вирішує, де починати читати (можна "перемотати").
  • Kafka не видаляє повідомлення одразу після прочитання (на відміну від RabbitMQ) - вони зберігаються за часом (retention.ms) або розміром (retention.bytes).

5. Масштабування і відмовостійкість

  • Кластер складається з кількох брокерів.
  • Кожна партиція може мати репліки на інших брокерах.
  • Один брокер - leader партиції, решта - followers.
  • Kafka використовує Zookeeper (або KRaft у нових версіях) для виборів лідерів, метаданих і кворуму.

Що таке топік і партиція в Kafka?

Топік (Topic) - логічна категорія (канал) повідомлень. Продюсери пишуть у топіки, консьюмери - читають із них. Приклади: user_events, order_status_changed, logs_errors.

Топік - це лише ім'я. Реально повідомлення зберігаються не "в топіку", а в його партиціях.

Партиція (Partition) - фізичний лог зберігання повідомлень усередині топіка. Кожне повідомлення в партиції має:

  • унікальний offset (номер зміщення);
  • строго збережений порядок (у межах однієї партиції);
  • розподіляється по брокерах для масштабування.

Кожен топік складається з однієї або більше партицій.

  • Продюсер пише в одну з партицій (за ключем або випадково).
  • Консьюмер читає в межах однієї партиції - строго в порядку надходження.

Kafka гарантує порядок у межах партиції, але не між партиціями.

Гарантії доставки в Kafka

1. At most once

Повідомлення може бути втрачене, але ніколи не доставиться двічі.

  • Продюсер відправляє і не чекає ack.
  • Можлива втрата при збоях.

Підходить для логів, метрик.

2. At least once

Повідомлення буде доставлене хоча б один раз, але можливі дублікати.

  • Продюсер чекає ack від брокера.
  • Якщо не отримав - повторна відправка.
  • На стороні консьюмера потрібна ідемпотентність.

Підходить для більшості систем.

3. Exactly once

Кожне повідомлення доставляється й обробляється строго один раз. Реалізується через:

  • enable.idempotence=true у продюсера (унікальність запису в лог);
  • transactional.id у продюсера (транзакції);
  • isolation.level=read_committed у консьюмера (читання лише підтверджених даних);
  • коміт offset'ів у межах транзакції.

Підходить для фінансових систем, платежів, інвентарів.

Ідемпотентні консьюмери

Summary

Консьюмер повідомлень має бути ідемпотентним: ключ ідемпотентності записується у сховище з UNIQUE-обмеженням, унаслідок чого перший запис виграє конкуренцію та виконує обробку, а наступні спроби завершуються помилкою унікальності без додаткових дій. Якщо вимагається exactly-once-семантика з атомарністю обробки, застосовується окремий паттерн - Inbox Pattern (див. architecture/architecture_patterns.md).

Брокер забезпечує at-least-once-доставку: якщо консьюмер не встиг підтвердити отримання повідомлення викликом ack (через збій процесу, перевищення таймауту обробки або мережеву помилку), брокер повторно доставляє те саме повідомлення. Кому саме - залежить від брокера: у RabbitMQ повідомлення з черги може дістатися будь-якому вільному консьюмеру; у Kafka партиція в межах consumer group належить одному консьюмеру одночасно, і повторну доставку отримує він сам - або інший консьюмер уже після ребалансу (session timeout, додавання/видалення інстансу). Саме повторні доставки є основним джерелом дублікатів. У pub/sub-моделях (кілька consumer-груп у Kafka; у RabbitMQ - кілька черг, прив'язаних до fanout-/topic-exchange) кожна група отримує свою копію повідомлення, і в межах кожної групи ця копія так само може бути повторно доставлена з тих самих причин. "Ідемпотентний консьюмер" - офіційна назва підходу, який на практиці зводиться до дедуплікації за ключем.

Ключ та сховище

Ключ ідемпотентності беруть або з самого повідомлення (message_id, event_id, order_id), або консьюмер формує детермінований ключ із полів payload (hash(user_id + event_time)).

Сховище ключів має бути зовнішнім: воркери є ефемерними, їхня кількість сягає десятків чи тисяч у різних процесах і на різних машинах, тому локальна структура set() не має доступу до стану інших воркерів і не може бути єдиним джерелом правди.

Реалізація

Найпростіший варіант - перевірка наявності ключа перед обробкою:

def handle(msg):
    key = msg["event_id"]
    if db.exists("processed_events", key):
        return  # duplicate, skip
    process(msg)
    db.insert("processed_events", key)

Така схема працює лише з одним воркером. При горизонтальному масштабуванні два воркери, які одночасно отримують одне й те саме повідомлення, обидва бачать відсутність ключа у БД і обидва переходять до обробки - відбувається дублювання.

Коректний варіант базується на UNIQUE-обмеженні: воркер, який першим встиг вставити рядок, виконує обробку; усі інші отримують помилку UniqueViolation і завершуються без додаткових дій. Схема не потребує посередника або окремої точки координації; рівень паралелізму обмежений лише пропускною здатністю БД.

CREATE TABLE processed_events (
    event_id    UUID PRIMARY KEY,
    consumer_id TEXT,
    started_at  TIMESTAMPTZ DEFAULT now()
);
def handle(msg):
    try:
        db.execute(
            "INSERT INTO processed_events (event_id, consumer_id) VALUES (%s, %s)",
            (msg["event_id"], WORKER_ID),
        )
    except UniqueViolation:
        return  # someone else already took it
    process(msg)  # we won the race

Обмеження UNIQUE працює на будь-якому рівні ізоляції, оскільки воно є фізичним обмеженням індексу. Застереження: у Postgres на REPEATABLE READ / SERIALIZABLE конфліктна вставка може повернути не unique_violation (SQLSTATE 23505), а serialization_failure (40001), тож обробник має враховувати обидва коди. Якщо логіка обробки складніша (читання + перевірка + запис), для коректності потрібен рівень SERIALIZABLE або явне блокування рядків через SELECT ... FOR UPDATE.

Якщо вимагається не просто дедуплікація, а exactly-once-семантика з атомарністю обробки, застосовується Inbox Pattern, який є дзеркальним до Outbox і розділяє отримання та обробку повідомлення на дві окремі фази.

Postgres vs Redis

  • Postgres - просте рішення, придатне для низького та середнього RPS. Використовує стандартні транзакції та UNIQUE-обмеження без необхідності у додаткових технологіях.
  • Redis - підходить для випадків із великим обсягом потоку повідомлень. Однопотоковий engine Redis серіалізує всі конкурентні команди, що виключає гонки за визначенням. Команда SET key value NX (set-if-not-exists) атомарно визначає, який саме процес отримав ключ першим.
# Redis: atomic key claim with TTL to survive a worker crash
if not redis.set(f"idem:{event_id}", WORKER_ID, nx=True, ex=3600):
    return  # someone else claimed it
process(msg)

Lua-скрипти в Redis

Якщо логіка ключа складніша за один SET NX (наприклад, distributed lock із fencing token або rate-limiter зі sliding window), її виносять в окремий Lua-скрипт. Скрипт виконується атомарно всередині однопотокового engine'у Redis (інтерпретатор Lua 5.1) без мережевих round-trip-ів між окремими перевірками. Саме це забезпечує швидкість виконання, а не JIT-компіляція (стоковий Redis JIT не використовує).

Ідемпотентність на API-рівні

Той самий паттерн застосовується на рівні HTTP-API. Якщо користувач двічі активує дію "Створити замовлення", браузер може відправити два POST-запити. Middleware зчитує заголовок Idempotency-Key і записує його у Redis або Postgres через те саме UNIQUE-обмеження. Перший запит виконується звичайним чином, а дублікат отримує закешовану відповідь першого виклику.