MQ
MQ - Message Queque⚑
Що таке MQ⚑
Черги повідомлень, по суті, є сполучною ланкою між різними процесами у додатках та надають надійний та масштабований інтерфейс взаємодії з іншими підключеними системами та пристроями. Черга — структура даних з принципом доступу до елементів "перший прийшов — перший вийшов". Додавання елемента можливе лише в кінець черги, вибірка — лише з початку черги, при цьому вибраний елемент з черги видаляється.
Десять причин, чому черги повідомлень є життєво важливим компонентом для будь-якої архітектури чи додатка
- Слабке зв'язування — черги повідомлень створюють неявні інтерфейси обміну даними, які дозволяють процесам бути незалежними одне від одного, тобто ви просто визначаєте формат повідомлень, відправлених від одного процесу іншому.
- Надмірність — черги дозволяють уникнути випадків неефективного використання ресурсів процесу (наприклад, пам'яті) внаслідок зберігання невикористаної (надмірної) інформації.
- Масштабованість — черги повідомлень дозволяють розподілити процеси обробки інформації. Таким чином, вони дозволяють легко збільшувати швидкість, з якою повідомлення додаються в чергу та обробляються.
- Еластичність та можливість витримувати пікові навантаження — черги повідомлень можуть виконувати роль свого роду буфера для накопичення даних в разі пікового навантаження, згладжуючи таким чином навантаження на систему обробки інформації та запобігаючи її відмові.
- Стійкість до відмов — черги повідомлень дозволяють розділити процеси один від одного, так що якщо процес, який обробляє повідомлення з черги, падає, повідомлення можуть бути додані в чергу для обробки пізніше, коли система відновиться.
- Гарантована доставка — використання черг повідомлень гарантує, що повідомлення буде доставлено та оброблено в будь-якому випадку (поки є принаймні один обробник).
- Гарантований порядок доставки — більшість систем черг повідомлень здатні забезпечити гарантії того, що дані будуть оброблятися у певному порядку (найчастіше у порядку їх надходження).
- Буферизація — черги повідомлень дозволяють відправляти та отримувати повідомлення, при цьому працюючи з максимальною ефективністю, надаючи буферний шар — процес запису в чергу може відбуватися настільки швидко, наскільки це може робити черга повідомлень, а не обробник повідомлення.
- Розуміння потоків даних — черги повідомлень дозволяють виявляти вузькі місця в потоках даних додатка, легко можна визначити, яка з черг заповнюється, яка не використовується, і визначити, що необхідно робити — додавати нових обробників повідомлень або оптимізувати поточну архітектуру.
- Асинхронний зв'язок — черги повідомлень надають можливість асинхронної обробки даних, що дозволяє помістити повідомлення в чергу без обробки, дозволяючи системі обробити повідомлення пізніше, коли з'явиться можливість.
Які готові реалізації MQ ви знаєте⚑
- RabbitMQ: Це відкрите програмне забезпечення, яке реалізує стандартизований протокол AMQP (Advanced Message Queuing Protocol). Він дозволяє надійно та масштабовано обмінюватися повідомленнями між різними системами та компонентами додатків.
- Apache Kafka: Це розподілена платформа для потокової обробки та зберігання даних. Він спроектований для обробки великої кількості подій (повідомлень) та забезпечує гарантовану доставку та зберігання цих подій.
- Redis: Redis може бути використаний як MQ за допомогою свого механізму публікації-підписки (pub-sub) та черги повідомлень (list). Він швидкий та легкий у використанні.
- Apache ActiveMQ: Це ще одна реалізація протоколу AMQP. Він надає різноманітні можливості обробки повідомлень, включаючи публікацію-підписку, черги повідомлень та багато іншого.
- ZeroMQ (0MQ): Це бібліотека для передачі повідомлень між процесами через різні протоколи. Вона дозволяє вам побудувати власні механізми обміну повідомленнями, а також підходить для складних топологій зв'язку.
- Apache RocketMQ: Це система повідомлень, яка забезпечує надійну та масштабовану передачу повідомлень між різними компонентами додатків.
Links
- Message middleware deployment and comparison: rabbitMQ, activeMQ, zeroMQ, rocketMQ, Kafka, redis
- RabbitMQ против Kafka: два разных подхода к обмену сообщениями
- Kafka VS RabbitMQ
- Выбор MQ для высоконагруженного проекта
Який життєвий цикл повідомлення в MQ ?⚑
- Створення повідомлення: Відправник (producer) генерує повідомлення та відправляє його в чергу через брокер обміну або API сервісу.
- Доставлення в чергу: Повідомлення додається до черги. У RabbitMQ це може бути зв'язано з маршрутизатором (exchange), який спрямовує повідомлення в одну чи кілька черг відповідно до правил маршрутизації.
- Зберігання повідомлення: Повідомлення зберігається в черзі до моменту, коли споживач (consumer) зможе його обробити. Для RabbitMQ можна налаштувати TTL (time-to-live) для повідомлень. В AWS SQS можна використовувати Dead Letter Queue для обробки невдалих повідомлень.
- Отримання повідомлення: Споживач зчитує повідомлення з черги. У RabbitMQ це відбувається шляхом підтвердження доставки (acknowledgment), що забезпечує надійність. В AWS SQS споживач блокує повідомлення (visibility timeout) для запобігання його повторному зчитуванню іншими споживачами.
- Обробка повідомлення: Споживач виконує необхідну логіку для обробки повідомлення. Наприклад, це може бути виклик функції або запис даних у базу.
- Видалення повідомлення: Після успішної обробки споживач підтверджує RabbitMQ або видаляє повідомлення з черги в AWS SQS. В Azure Service Bus використовується подібний підхід із механізмом підтвердження.
- Помилки обробки: Якщо повідомлення не може бути оброблене, воно може бути відправлене в Dead Letter Queue (у AWS SQS та Azure Service Bus) або повернене в RabbitMQ для повторної обробки залежно від налаштувань.
Особливості роботи з Redis⚑
Redis працює в одному процесі та в одному потоці, тобто він може обробляти один запит за один раз. Якщо спробувати вручну дістати всі значення, які зберігаються, то заблокується робота з іншими з'єднаннями.
Які структури даних є в Redis?⚑
Redis має кілька вбудованих структур даних, кожна для своїх сценаріїв:
- String - найпростіше значення (текст, число, JSON, бінарні дані). Використовується для токенів, прапорців, лічильників.
- List - впорядкована колекція, підтримує швидкі операції додавання/видалення з обох кінців. Підходить для черг, логів, стеків.
- Set - невпорядкована множина унікальних елементів. Підтримує перевірку наявності, об'єднання, перетин, різницю. Корисно для зберігання унікальних ID, тегів, підписників.
- Sorted Set - як
Set, але кожен елемент має числовий пріоритет (score), за яким сортується. Часто використовується для рейтингів і топів. - Hash - структура виду
ключ → поле → значення(як словник). Підходить для об'єктів з кількома атрибутами (профілі користувачів). - Bitmap - масив бітів, де можна встановлювати і перевіряти окремі біти за індексом. Підходить для прапорців активності або відміток.
- Stream - структура для логування подій у реальному часі. Підтримує консьюмери, групи і контроль доставки. Використовується для побудови черг і стрімінгу даних.
За рахунок чого RabbitMQ гарантує доставку повідомлень?⚑
Доставка гарантується зв'язкою збереження на диск + підтвердження + реплікація + обробка помилок.
1. Стійкість (durability & persistence)
- Черга позначається як
durable=True- щоб не зникла при перезапуску брокера. - Повідомлення позначається як
persistent(delivery_mode=2) - щоб RabbitMQ записав його на диск.
2. Підтвердження від продюсера (Publisher Confirms)
Продюсер може запросити підтвердження від брокера, що повідомлення прийняте і збережене. Якщо брокер не підтвердив доставку - продюсер може відправити повторно.
3. Підтвердження від консьюмера (Consumer Ack)
RabbitMQ чекає, щоб консьюмер підтвердив отримання (basic_ack). Поки ack немає:
- повідомлення вважається необробленим;
- якщо консьюмер падає - повідомлення повертається до черги.
Слід використовувати auto_ack=False, щоб не втрачати повідомлення при збої консьюмера.
4. Реплікація (HA)
За допомогою Mirrored queues або Quorum queues повідомлення копіюється на кілька нод - не загубиться при падінні однієї.
5. Обробка помилок (Dead Letter Exchange)
Повідомлення, які не вдалося обробити, можуть відправитися в DLX - на повторну обробку або аналіз.
Гарантії доставки в RabbitMQ⚑
1. At most once (максимум один раз)
Повідомлення може бути втрачене, але ніколи не доставиться двічі.
- Продюсер не чекає підтверджень.
- Черга може бути нестійка (non-durable), повідомлення - не
persistent. - Консьюмер не підтверджує отримання (
auto_ack=True).
Підходить для: телеметрії, логів, коли втрата допустима.
2. At least once (мінімум один раз)
Повідомлення буде доставлено, але можливі дублікати.
- Продюсер:
publisher confirms. - Повідомлення:
persistent. - Черга:
durable. - Консьюмер: підтверджує вручну (
ack), інакше повідомлення повертається.
Підходить для більшості надійних систем, де дублікати можна обробляти (наприклад, через ідемпотентність).
3. Exactly once (точно один раз)
RabbitMQ сам по собі не гарантує - реалізується на рівні застосунку через:
- ідемпотентну обробку;
- зовнішні транзакції (наприклад, з базою даних);
- дедуплікацію за
message_id.
Підходить для фінансових операцій, обліку, коли ні втрата, ні дублікат недопустимі.
Dead-letter exchange (DLX) в RabbitMQ⚑
Dead-letter exchange - механізм, який перенаправляє необроблені або відхилені повідомлення в окрему чергу. Повідомлення може потрапити в DLX, якщо воно:
- відхилене (
nack/rejectзrequeue=False); - прострочене (вийшов TTL);
- не помістилось у чергу (переповнення).
Щоб увімкнути DLX, у звичайної черги налаштовується параметр x-dead-letter-exchange, і "погані" повідомлення автоматично потрапляють до окремої черги для повторної обробки або аналізу помилок.
Пріоритизація повідомлень в RabbitMQ⚑
RabbitMQ підтримує priority queue - повідомлення з вищим пріоритетом обробляються раніше, навіть якщо опубліковані пізніше.
Кроки:
- Створюємо чергу з максимальним пріоритетом - параметр
x-max-priority. - Публікуємо повідомлення з вказаним полем
priority.
Особливості:
- Пріоритети - цілі числа, більше = важливіше.
- При однакових пріоритетах - FIFO.
- Працює тільки в межах однієї черги.
- Без
x-max-priorityполеpriorityігнорується. - Підтримується
pika,kombu,Celery.
Важливо: не варто робити багато рівнів - 5–10 достатньо. Велика кількість рівнів погіршує продуктивність через внутрішню переоцінку порядку.
Як працюють пріоритети між кількома чергами?⚑
RabbitMQ не підтримує глобальний пріоритет між чергами - пріоритети не "переходять" від однієї черги до іншої. Якщо приоритетна черга стала порожньою, консьюмер не починає автоматично читати з "менш пріоритетної".
Правильні підходи:
- Одна черга з
x-max-priority- найпростіший варіант, якщо реально потрібен пріоритет. - На стороні консьюмера: спочатку зчитувати з пріоритетної, потім - зі звичайної (через
basic.getабо вибір черги). Але це дає лише імітацію - справжнього "переходу" пріоритету між чергами немає.
Розклад між чергами залежить від раунду/доступності консьюмерів, а не від пріоритету.
Як захиститися від непрочитаних/непідтверджених повідомлень?⚑
1. Підтвердження доставки (ACK/NACK)
- RabbitMQ:
manual_ack=Trueвbasic_consume. Після обробки -basic_ack(delivery_tag). При помилці -basic_nack()абоbasic_reject()зrequeue=True/False. - Без
ackповідомлення вважається "у повітрі" - якщо консьюмер впаде, воно повернеться до черги.
2. Message TTL + Dead Letter Exchange
Якщо повідомлення не оброблено за X часу або N разів не вдалося - відправляти його в окрему чергу через DLX для повторної обробки чи аналізу.
3. Ідемпотентність обробки
Повторна доставка - це норма при збоях. Обробка має бути ідемпотентною:
- зберігаємо
message_idу Redis/Postgres; - при повторі - не обробляємо дублікат.
Конфлікт двох повідомлень з однаковим ключем⚑
Кейс: в чергу прилетіли два повідомлення, які змінюють дату народження одного користувача. Як зрозуміти, яке з них застосовувати?
Рішення: ідемпотентність + версійність або мітка часу.
Варіант 1: event_time / updated_at
У кожного повідомлення є поле event_time - коли було сформовано зміну (не коли прилетіло!). Консьюмер порівнює event_time повідомлення з поточним user.birthdate_updated_at у БД. Якщо event_time > current - застосовує, інакше - ігнорує.
# message_1: {"user_id": 1, "new_birthdate": "1990-01-01", "event_time": "2024-01-01T12:00:00"}
# message_2: {"user_id": 1, "new_birthdate": "1991-01-01", "event_time": "2024-01-01T12:00:05"}
# message_2 wins regardless of delivery order
Варіант 2: версія (version, revision, seq_id)
Кожне повідомлення несе номер версії. Консьюмер читає поточну версію в БД, застосовує лише якщо version > current_version.
# {"user_id": 1, "birthdate": "1990-01-01", "version": 3}
# {"user_id": 1, "birthdate": "1991-01-01", "version": 4}
Якщо не використати ні того, ні іншого - можлива "гонка" між застарілими і актуальними подіями: при ретраях, паралельних воркерах, довгій доставці.
Як Kafka працює під капотом?⚑
Kafka - розподілена лог-орієнтована система повідомлень, спроектована для високої продуктивності, стійкості й обробки великих потоків даних у реальному часі.
1. Основні сутності
- Producer - відправляє повідомлення в Kafka.
- Consumer - читає повідомлення з Kafka.
- Broker - сервер Kafka, що зберігає й передає дані.
- Topic - логічна категорія/канал повідомлень.
- Partition - фізичний сегмент усередині топіка (забезпечує масштабування).
- Consumer Group - група консьюмерів, що спільно читають партиції.
2. Зберігання повідомлень (лог-файли)
Kafka зберігає повідомлення у файлах на диску як стійкий впорядкований лог:
- Кожна партиція - це append-only лог-файл.
- Нові повідомлення дописуються в кінець, старі не змінюються.
- Дані передаються з диску з використанням zero-copy.
3. Чому Kafka швидка
- Append-only запис - найшвидша операція на диску.
- Zero-copy I/O - дані передаються з page cache у сокет, обходячи user-space.
- Партиції дозволяють розпаралелювати обробку.
4. Підписка і зсуви (offsets)
- У кожного консьюмера є
offset- позиція в партиції. - Консьюмер сам вирішує, де починати читати (можна "перемотати").
- Kafka не видаляє повідомлення одразу після прочитання (на відміну від RabbitMQ) - вони зберігаються за часом (
retention.ms) або розміром (retention.bytes).
5. Масштабування і відмовостійкість
- Кластер складається з кількох брокерів.
- Кожна партиція може мати репліки на інших брокерах.
- Один брокер - leader партиції, решта - followers.
- Kafka використовує Zookeeper (або KRaft у нових версіях) для виборів лідерів, метаданих і кворуму.
Що таке топік і партиція в Kafka?⚑
Топік (Topic) - логічна категорія (канал) повідомлень. Продюсери пишуть у топіки, консьюмери - читають із них. Приклади: user_events, order_status_changed, logs_errors.
Топік - це лише ім'я. Реально повідомлення зберігаються не "в топіку", а в його партиціях.
Партиція (Partition) - фізичний лог зберігання повідомлень усередині топіка. Кожне повідомлення в партиції має:
- унікальний
offset(номер зміщення); - строго збережений порядок (у межах однієї партиції);
- розподіляється по брокерах для масштабування.
Кожен топік складається з однієї або більше партицій.
- Продюсер пише в одну з партицій (за ключем або випадково).
- Консьюмер читає в межах однієї партиції - строго в порядку надходження.
Kafka гарантує порядок у межах партиції, але не між партиціями.
Гарантії доставки в Kafka⚑
1. At most once
Повідомлення може бути втрачене, але ніколи не доставиться двічі.
- Продюсер відправляє і не чекає
ack. - Можлива втрата при збоях.
Підходить для логів, метрик.
2. At least once
Повідомлення буде доставлене хоча б один раз, але можливі дублікати.
- Продюсер чекає
ackвід брокера. - Якщо не отримав - повторна відправка.
- На стороні консьюмера потрібна ідемпотентність.
Підходить для більшості систем.
3. Exactly once
Кожне повідомлення доставляється й обробляється строго один раз. Реалізується через:
enable.idempotence=trueу продюсера (унікальність запису в лог);transactional.idу продюсера (транзакції);isolation.level=read_committedу консьюмера (читання лише підтверджених даних);- коміт
offset'ів у межах транзакції.
Підходить для фінансових систем, платежів, інвентарів.
Ідемпотентні консьюмери⚑
Summary
Консьюмер повідомлень має бути ідемпотентним: ключ ідемпотентності записується у сховище з
UNIQUE-обмеженням, унаслідок чого перший запис виграє конкуренцію та виконує обробку, а наступні спроби завершуються помилкою унікальності без додаткових дій. Якщо вимагається exactly-once-семантика з атомарністю обробки, застосовується окремий паттерн - Inbox Pattern (див.architecture/architecture_patterns.md).
Брокер забезпечує at-least-once-доставку: якщо консьюмер не встиг підтвердити отримання повідомлення викликом ack (через збій процесу, перевищення таймауту обробки або мережеву помилку), брокер повторно доставляє те саме повідомлення. Кому саме - залежить від брокера: у RabbitMQ повідомлення з черги може дістатися будь-якому вільному консьюмеру; у Kafka партиція в межах consumer group належить одному консьюмеру одночасно, і повторну доставку отримує він сам - або інший консьюмер уже після ребалансу (session timeout, додавання/видалення інстансу). Саме повторні доставки є основним джерелом дублікатів. У pub/sub-моделях (кілька consumer-груп у Kafka; у RabbitMQ - кілька черг, прив'язаних до fanout-/topic-exchange) кожна група отримує свою копію повідомлення, і в межах кожної групи ця копія так само може бути повторно доставлена з тих самих причин. "Ідемпотентний консьюмер" - офіційна назва підходу, який на практиці зводиться до дедуплікації за ключем.
Ключ та сховище
Ключ ідемпотентності беруть або з самого повідомлення (message_id, event_id, order_id), або консьюмер формує детермінований ключ із полів payload (hash(user_id + event_time)).
Сховище ключів має бути зовнішнім: воркери є ефемерними, їхня кількість сягає десятків чи тисяч у різних процесах і на різних машинах, тому локальна структура set() не має доступу до стану інших воркерів і не може бути єдиним джерелом правди.
Реалізація
Найпростіший варіант - перевірка наявності ключа перед обробкою:
def handle(msg):
key = msg["event_id"]
if db.exists("processed_events", key):
return # duplicate, skip
process(msg)
db.insert("processed_events", key)
Така схема працює лише з одним воркером. При горизонтальному масштабуванні два воркери, які одночасно отримують одне й те саме повідомлення, обидва бачать відсутність ключа у БД і обидва переходять до обробки - відбувається дублювання.
Коректний варіант базується на UNIQUE-обмеженні: воркер, який першим встиг вставити рядок, виконує обробку; усі інші отримують помилку UniqueViolation і завершуються без додаткових дій. Схема не потребує посередника або окремої точки координації; рівень паралелізму обмежений лише пропускною здатністю БД.
CREATE TABLE processed_events (
event_id UUID PRIMARY KEY,
consumer_id TEXT,
started_at TIMESTAMPTZ DEFAULT now()
);
def handle(msg):
try:
db.execute(
"INSERT INTO processed_events (event_id, consumer_id) VALUES (%s, %s)",
(msg["event_id"], WORKER_ID),
)
except UniqueViolation:
return # someone else already took it
process(msg) # we won the race
Обмеження UNIQUE працює на будь-якому рівні ізоляції, оскільки воно є фізичним обмеженням індексу. Застереження: у Postgres на REPEATABLE READ / SERIALIZABLE конфліктна вставка може повернути не unique_violation (SQLSTATE 23505), а serialization_failure (40001), тож обробник має враховувати обидва коди. Якщо логіка обробки складніша (читання + перевірка + запис), для коректності потрібен рівень SERIALIZABLE або явне блокування рядків через SELECT ... FOR UPDATE.
Якщо вимагається не просто дедуплікація, а exactly-once-семантика з атомарністю обробки, застосовується Inbox Pattern, який є дзеркальним до Outbox і розділяє отримання та обробку повідомлення на дві окремі фази.
Postgres vs Redis
- Postgres - просте рішення, придатне для низького та середнього RPS. Використовує стандартні транзакції та
UNIQUE-обмеження без необхідності у додаткових технологіях. - Redis - підходить для випадків із великим обсягом потоку повідомлень. Однопотоковий engine Redis серіалізує всі конкурентні команди, що виключає гонки за визначенням. Команда
SET key value NX(set-if-not-exists) атомарно визначає, який саме процес отримав ключ першим.
# Redis: atomic key claim with TTL to survive a worker crash
if not redis.set(f"idem:{event_id}", WORKER_ID, nx=True, ex=3600):
return # someone else claimed it
process(msg)
Lua-скрипти в Redis
Якщо логіка ключа складніша за один SET NX (наприклад, distributed lock із fencing token або rate-limiter зі sliding window), її виносять в окремий Lua-скрипт. Скрипт виконується атомарно всередині однопотокового engine'у Redis (інтерпретатор Lua 5.1) без мережевих round-trip-ів між окремими перевірками. Саме це забезпечує швидкість виконання, а не JIT-компіляція (стоковий Redis JIT не використовує).
Ідемпотентність на API-рівні
Той самий паттерн застосовується на рівні HTTP-API. Якщо користувач двічі активує дію "Створити замовлення", браузер може відправити два POST-запити. Middleware зчитує заголовок Idempotency-Key і записує його у Redis або Postgres через те саме UNIQUE-обмеження. Перший запит виконується звичайним чином, а дублікат отримує закешовану відповідь першого виклику.