Async
Async⚑
Асинхронність⚑
Асинхронність - один зі способів виконання кількох завдань відразу. Асинхронний код дозволяє програмі виконувати інші операції під час чекання на відповідь від введення/виведення або мережевого запиту. Асинхронність поліпшує продуктивність, оскільки програма не чекає завершення блокуючих операцій перед виконанням інших завдань.
Вона пропонує вирішувати проблему з допомогою функцій зворотнього виклику (callback). Зустрівши в коді блокуючий запит, інтерпретатор вішає на нього сигнальний маячок і йде далі. Коли запит завершується, маячок подасть сигнал. В цей момент обробник повернеться, отримає результат і викличе колбек.
Coroutines - Корутини - Співпрограми⚑
Корутина - це спеціальний вид асинхронних функцій, яка може призупиняти своє виконання, і продовжити виконання з місця зупинки. На відміну від звичайних функцій, які виконуються наново з початку. Поведінка корутин схожа на поведінку генераторів. Для визначення корутини починаючи з Python 3.5+ використовують ключове слово async. Вони використовуються для виконання асинхронних операцій, де важливо чекати на відповідь без блокування основного потоку виконання.
Інше визначення - співпрограма — це термін, який позначає завдання, яке планується циклом подій у програмі замість операційної системи.
У корутині можна використовувати ключове слово await, що вказує на асинхронне очікування результату виконання іншої корутини чи асинхронної функції.
Корутини мають багато спільного з потоками, але на відміну від потоків, вони віддають керування лише тоді, коли викликають іншу корутину, і вони не використовують так багато пам'яті.
Корутини є основою для асинхронного програмування в Python і дозволяють виконувати багатозадачні операції без необхідності великої кількості потоків чи процесів.
Корутини можна визначати і створювати у звичайному Python-коді, але запускати їх можна лише в циклі подій.
Асинхронність в Python⚑
Починаючи з Python 3.5+ асинхронність реалізована за допомогою асинхронних функцій і ключових слів async та await. Асинхронні функції визначаються з використанням ключового слова async перед визначенням функції. Ключове слово await вказує на те, що програма повинна зачекати на результат виконання асинхронної функції перед тим, як продовжити виконання наступних інструкцій.
Для управління асинхронними задачами у Python використовуються asyncio бібліотека та event loop, яка дозволяє виконувати асинхронний код в кооперативному багатозадачному середовищі.
Links
- Мистецтво чекати. Ефективність асинхронності в Python - dou.ua
- David Beazley - Build Your Own Async - PyCon India, 2019
Asyncio⚑
Asyncio – модуль асинхронного програмування, який був представлений в Python 3.4. Він призначений для використання співпрограм і future для спрощення написання асинхронного коду і робить його майже таким самим читаним, як синхронний код, через відсутності callback-ів.
Asyncio надає цикл подій та ще деякі інші функції. Цикл подій реагує на різні I/O-події та перемикається на завдання, що можуть виконуватися і призупиняє ті, що чекають на I/O. Тобто ми не витрачаємо час на завдання, що ще не готові виконуватися.
Asyncio використовує різні конструкції: event loop, співпрограми та future.
- event loop управляє і контролює виконання різних завдань. Він реєструє їх і обробляє розподіл потоку управління між ними.
- Співпрограми – це спеціальні функції, робота яких схожа з роботою генераторів в Python, за допомогою
awaitвони повертають потік управління назад вevent loop. Запуск співпрограми повинен бути запланований вevent loop. Заплановані співпрограми будуть обгорнуті в Завдання, що є типомFuture. - Future - це об'єкти, які представляють результат, що буде доступний у майбутньому. Вони як порожнє місце, яке ще не має значення, але обіцяє його надати. Найкраще порівняння — футура = обіцянка. Коли результат стає доступним (наприклад, прийшла відповідь з мережі), футура позначається як завершена і викликає колбек — саме той, який був зареєстрований, щоб продовжити виконання корутини. Тобто
futureпоказує результат задачі, яка може або не може бути виконана. Результатом може бути exception. Коли огортаємо співпрограму вFuture- отримуємо об’єктTask. - Завдання (
asyncio.Tasks) - це обгортки над корутинами, які запускаються у фоновому режимі й одразу передаються під контроль циклу подій. На відміну від звичайної корутини, яка просто "готова" до виконання, задача може почати працювати відразу після створення. Вона стежить за перебігом виконання, зберігає результат або помилку, і дозволяє іншим частинам коду дізнатись, чим усе завершилось. Тобто вона огортає корутини, аби їх виконання могло незалежно плануватись циклом подій, коли йому передається управління (зазвичай за допомогоюawait). Створити завдання можна за допомогоюasyncio.create_task(). У PythonTask— це підкласFuture, тобто кожна таска по суті є футурою, але з додатковою логікою виконання корутин.
Спрощено схема роботи виглядає наступним чином: У нас є цикл подій (event loop) та асинхронні функції, I/O-операції. Ми передаємо свої функції до циклу подій, щоб він запустив їх. Цикл подій повертає нам об'єкт Future. Можна сказати, що це обіцянка, що ми отримаємо якісь дані в майбутньому. Ми зберігаємо його і час від часу перевіряємо чи не має наш Future результату виконання. І якщо так, то використовуємо ці дані для подальшої обробки.
Щоб зупиняти та відновлювати завдання asyncio використовує генератори та співпрограми (generators and coroutines). У разі, якщо в черзі очікування є завдання, то контекст буде перемикнуто, в іншому випадку – ні.
Визначення корутини починається з async, а її виклик - з await. asyncio.run(coroutine) є основною точкою входу для асинхронних програм.
Функції wait(), gather() і as_completed() запускають кілька корутин одночасно. Модуль asyncio також надає власні класи Queue, Event, Lock і Semaphore.
Asyncio на прикладі
import asyncio
import datetime
import random
async def my_sleep_func():
await asyncio.sleep(random.randint(0, 5))
async def display_date(num, loop):
end_time = loop.time() + 50.0
while True:
print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
if (loop.time() + 1.0) >= end_time:
break
await my_sleep_func()
loop = asyncio.get_event_loop()
asyncio.ensure_future(display_date(1, loop))
asyncio.ensure_future(display_date(2, loop))
loop.run_forever()
- Асинхронна функція
display_dateприймає число-індентифікатор та цикл подій. - Функція має безкінечний цикл, що переривається через 50 секунд. Але поки 50 секунд не минуло, вона друкує час і засинає на випадкову кількість секунд. Ключове слово
awaitвказує, що під час виконання функції, що стоїть після нього, можна перемкнутися на іншу асинхронну функцію (співпрограму). - Функції додаються до циклу подій за допомогою функції
ensure_future. - Запускається цикл подій.
Links
Що таке async/await, навіщо вони потрібні і як їх використовувати⚑
Ключове слово async використовується перед def, щоб показати, що функція є асинхронною (корутиною). Тобто, якщо визначити функцію async def f(): ... та викликатиме її як f() — повернеться корутина. Прийшла на зміну декоратору @asyncio.coroutine в Python 3.5+.
Ключове слово await вказує, що очікується завершення співпрограми. await може бути використане лише в співпрограмі. awaitable — все, що підтримує await, тобто корутини, asyncio.Futures, asyncio.Tasks, об'єкти з методом __await__. await прийшов на зміну yield from в Python 3.5+.
import asyncio
import aiohttp
urls = ['https://www.google.com', 'https://www.python.org']
async def call_url(session, url):
print(f'Run {url}')
async with session.get(url) as response:
data = await response.text()
print(f'{url}: {len(data)} bytes')
return data
async def main(urls):
async with aiohttp.ClientSession() as session:
tasks = [call_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
asyncio.run(main(urls)) # Run the event loop
Програма містить асинхронний метод. Під час виконання він повертає співпрограму, яка потім перебуває у стані очікування.
async/await необхідні для того, щоб не блокувати виконання потоку під час очікування асинхронної події. Конструкція async/await фактично перетворює функцію на корутину (співпрограму): вона призупиняє своє виконання під час await, очікує асинхронної події та продовжує роботу.
Історична еволюція корутин у Python
Перехід від генератор-базованих корутин до нативного синтаксису відбувся не за один крок:
- Python 3.4 - модуль
asyncioдодано до стандартної бібліотеки. Корутини визначалися як генератори, обгорнуті декоратором@asyncio.coroutine; передача керування - черезyield from. - Python 3.5 (PEP 492) - додано нативний синтаксис
async def/await, окремий від генераторів.await- заміна дляyield from, але семантично прив'язана до корутин, а не до загального протоколу ітерації. - Python 3.8 - декоратор
@asyncio.coroutineоголошено застарілим (deprecated) з нотаткою "will be removed in version 3.10". - Python 3.11 -
@asyncio.coroutineостаточно видалено (whatsnew 3.11: "Removed the@asyncio.coroutinedecorator enabling legacy generator-based coroutines to be compatible with async/await code").
Нативні корутини (async def) існують із Python 3.5 і весь цей час були окремою сутністю від генераторів на рівні API. У 3.11 видалили саме legacy-варіант із декоратором @asyncio.coroutine, а не нативну реалізацію. Внутрішні CPython-механізми (паузи, збереження стану кадру) історично спільні з генераторами, проте з точки зору API async def ніколи не була generator-based.
Що таке event loop (цикл подій) в asyncio і як він працює?⚑
Цикл подій є ядром кожної асинхронної програми. Цикли подій запускають асинхронні завдання та зворотні виклики, виконують мережеві операції вводу-виводу та запускають підпроцеси.
Event Loop - це механізм, який дозволяє координувати виконання асинхронних операцій у Python. Він дозволяє програмі взаємодіяти з багатьма завданнями, такими як ввід-вивід, мережеві запити, без блокування основного потоку виконання.
Event Loop - це як певний безкінечний цикл, який дозволяє розпізнати, чи настала певна подія операційної системи (наприклад, запис даних до сокету).
Event Loop працює на принципі опитування: він постійно перевіряє список подій та завдань, які очікують виконання, і обробляє їх послідовно. Коли подія стає доступною для обробки (наприклад, завершення мережевого запиту), Event Loop викликає відповідну функцію-зворотний виклик, яку визначили, для обробки результату.
Під капотом, Event Loop в asyncio працює на подібних принципах, як в традиційних механізмах вводу-виводу як Select, Poll та Epoll. Він встановлює "наглядачі" на різні асинхронні операції та очікує їхньої готовності.
З допомогою Select формується список файлових дескрипторів, за якими планується спостерігати. У клієнтському коді доведеться перевіряти всі передані дескриптори на наявність подій (і їх кількість обмежена 1024), що робить його повільним та незручним.
У випадку Poll та Epoll, Event Loop використовує більш ефективні механізми опитування, дозволяючи ефективно взаємодіяти з більшим числом подій.
uvloop як альтернатива стандартному loop
uvloop - реалізація event loop'а на C поверх libuv (тієї ж бібліотеки, що лежить в основі Node.js). Drop-in заміна стандартного asyncio loop'а:
За бенчмарками авторів uvloop (вимірювання на HTTP- і echo-сервері), пропускна здатність зростає у кілька разів порівняно зі стандартним asyncio loop'ом за рахунок реалізації на C і оптимізованої роботи з epoll/kqueue. Фактичний виграш залежить від характеру навантаження - на CPU-bound коді або частих переходах user/kernel ефект менший. Семантика для прикладного коду не змінюється: ті ж async def, await, Task, gather.
Обмеження: не підтримує Windows нативно (libuv підтримує, але прив'язки uvloop - ні; для Windows у asyncio з 3.8 за замовчуванням використовується ProactorEventLoop поверх IOCP, і uvloop там не потрібен). Тому в більшості продакшн-сценаріїв uvloop ставлять на Linux-серверах, тоді як локальна розробка на macOS/Windows може використовувати стандартний loop.
Що таке мультиплексування⚑
Мультиплексування в Linux — це важливий механізм управління потоками даних, який дозволяє ефективно обробляти кілька вводів/виводів (I/O) одночасно без блокування. Це особливо актуально для серверів, які повинні обробляти тисячі з'єднань, або програм, що взаємодіють із багатьма джерелами даних.
У Linux є кілька механізмів для мультиплексування:
select()- Старий і широко використовуваний системний виклик.
- Дозволяє програмі перевіряти кілька файлових дескрипторів для читання, запису або наявності помилок.
-
Має обмеження: максимальне число дескрипторів, що перевіряються, залежить від системної константи
FD_SETSIZE. -
poll() - Покращена версія
select(). - Використовує список дескрипторів, усуваючи обмеження на кількість файлових дескрипторів.
-
Вимагає більше ресурсів для великих наборів дескрипторів, тому менш ефективний у масштабних сценаріях.
-
epoll() - Сучасний механізм мультиплексування, доступний у Linux.
- Дуже ефективний для програм із великою кількістю з'єднань.
- Забезпечує асинхронне сповіщення про події і знижує витрати на масштабування.
- Складається з трьох основних функцій:
epoll_create()— створює "еполл-дескриптор".epoll_ctl()— додає або видаляє дескриптори до/з черги.epoll_wait()— очікує на події.
Мультиплексування дозволяє одній програмі працювати із багатьма джерелами даних одночасно, не блокуючи виконання інших завдань. Наприклад:
- Сервер може приймати одночасні з'єднання клієнтів, не чекаючи, поки кожен клієнт завершить обмін даними.
- Програми можуть одночасно читати/писати дані з декількох сокетів або файлів.
Приклад із epoll() Ось спрощений приклад використання epoll():
#include <sys/epoll.h>
#include <unistd.h>
#include <stdio.h>
#define MAX_EVENTS 10
int main() {
int epoll_fd = epoll_create1(0); // Створення epoll-дескриптора
struct epoll_event event, events[MAX_EVENTS];
// Налаштування дескриптора (наприклад, сокета) для еполла
event.events = EPOLLIN; // Очікування на читання
event.data.fd = 0; // Стандартний ввід (наприклад, для демонстрації)
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, 0, &event);
while (1) {
int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < num_events; i++) {
if (events[i].data.fd == 0) {
char buffer[100];
read(0, buffer, sizeof(buffer)); // Читання введених даних
printf("Введено: %s\n", buffer);
}
}
}
close(epoll_fd);
return 0;
}
Цей приклад показує, як epoll() може відстежувати події для стандартного вводу (або іншого файлового дескриптора) і ефективно обробляти їх.
Переваги мультиплексування в Linux
- Ефективність: дозволяє працювати з великою кількістю з'єднань.
- Гнучкість: підтримує різні типи дескрипторів (сокети, файли, пайпи тощо).
- Асинхронність: сприяє створенню неблокуючих програм.
Links
Проблема C10k⚑
Summary
C10k - задача обслуговування 10 000 одночасних з'єднань одним сервером, сформульована Деном Кегелом у 1999 році. У його консервативній оцінці (32-бітна система з 1 GB user-VM та 2 MB стека на потік) модель "потік на з'єднання" вичерпувала віртуальний адресний простір уже на ~512 потоках; на типовому 32-bit Linux з 3:1 user/kernel split межа була вищою (~1500 потоків), але порядок проблеми зберігався. Системні виклики
select()таpoll()додатково деградували черезO(n)-сканування переважно простоюючих файлових дескрипторів. Розв'язком стали дві сім'ї механізмів мультиплексування вводу-виводу: readiness-нотифікація (epoll,kqueue) та completion-нотифікація (IOCP,io_uring). На сучасному 64-бітному Linux обидві моделі - і потокова, і асинхронна - витримують 10 000+ з'єднань; C10k становить інтерес передусім як історичний кейс, з якого виросли сучасні архітектурні норми (event-driven вводу-виводу, single-process сервери з високою конкурентністю).
Внесок Дена Кегеля
У роботі c10k.html Кегель сформулював межі моделі "потік на з'єднання" та класифікував стратегії неблокуючого вводу-виводу. Точна оцінка з розділу про потокову модель:
"If each thread gets a 2MB stack [...], you run out of virtual memory at (2^30 / 2^21) = 512 threads on a 32 bit machine with 1GB user-accessible VM"
Обмеженням є не нестача оперативної пам'яті, а вичерпання віртуального адресного простору. Кегель узяв консервативні 1 GB user-VM; типовий 32-bit Linux мав 3:1 user/kernel split (3 GB user-space), що піднімало межу до ~1500 потоків - того ж порядку проблеми. На архітектурі x86-64 адресний простір користувача складає 128 TB, і це обмеження зникає.
Кегель також зафіксував ліміти системних викликів. Виклик select() обмежений константою FD_SETSIZE (зазвичай 1024) на рівні заголовка <sys/select.h>. Виклик poll() не має жорсткого ліміту, проте деградує на кількох тисячах файлових дескрипторів. Розділ про poll:
"it does get slow about a few thousand, since most of the file descriptors are idle at any one time, and scanning through thousands of file descriptors takes time"
Розбір поширених тверджень про потокову модель
Деякі формулювання, що часто наводяться при обговоренні теми, не витримують перевірки.
Перше - оцінка пам'яті виду "10 000 × 8 MB = 80 GB на стеки". Ця цифра відображає віртуальний адресний простір, а не резидентну оперативну пам'ять. Стек у Linux виділяється на вимогу (demand paging): фізичні сторінки по 4 KB виділяються лише тоді, коли стек реально до них доростає. Потік, заблокований на виклику recv(), фактично використовує лише кілька KB стека замість зарезервованих 8 MB. Резидентне споживання для 10 000 простоюючих потоків складає кілька сотень MB; навіть гіпотетичний worst-case з повністю заповненими стеками - одиниці GB, а не 80.
Друге - твердження про критичне навантаження від тисяч context switch'ів. Потік, заблокований у recv(), перебуває у стані not-runnable, і планувальник ОС його не обробляє. У IO-bound сценарії з 10 000 з'єднань більшість потоків постійно сплять у ядрі; планувальник перемикає лише той потік, чий файловий дескриптор перейшов у стан "готовий". Реальна вартість перемикань проявляється, коли значна кількість потоків одночасно перебуває у стані runnable (CPU-bound навантаження або синхронізаційні зриви), а не від великої кількості потоків як таких.
Третє - припущення, що асинхронна модель уникає переходів між user- та kernel-mode. Виклики read(), write() та epoll_wait() є системними викликами і виконують перехід у режим ядра. Перевага асинхронної моделі полягає не у відсутності mode switches, а у двох інших ефектах: один epoll_wait повертає готовність відразу для пачки дескрипторів замість одного блокуючого read() на потік; планувальник задач більше не виконується ядром - цю роль перебирає event loop у користувацькому просторі.
Механізми мультиплексування
select() і poll() об'єднує одна архітектурна риса: на кожному виклику повна множина дескрипторів передається між user- і kernel-space, і ядро сканує її лінійно. У select() це бітова мапа з фіксованим FD_SETSIZE, у poll() - масив pollfd-ів без жорсткого ліміту. Звідси й деградація на тисячах переважно простоюючих fd-ів.
epoll усуває обидві проблеми. Файлові дескриптори реєструються один раз через epoll_ctl(EPOLL_CTL_ADD, fd, ...); ядро тримає множину fd у червоно-чорному дереві зі складністю реєстрації O(log n). Виклик epoll_wait() повертає лише ті дескриптори, які перейшли у стан "готовий", - складність O(готових_подій) незалежно від загальної кількості зареєстрованих.
Системи мультиплексування поділяються на дві принципово різні моделі, які Кегель класифікує окремо. Readiness-нотифікація (паттерн Reactor) - ядро повідомляє про готовність файлового дескриптора до операції, а сам неблокуючий syscall виконується застосунком; результат повертається синхронно. До цього сімейства належать epoll на Linux та kqueue на FreeBSD і macOS. Completion-нотифікація (паттерн Proactor) - застосунок передає ядру операцію разом із буфером, а ядро самостійно її виконує і кладе готовий результат у completion-чергу; додатковий read або write після подачі вже не потрібен. До цього сімейства належать IOCP на Windows та io_uring у сучасному Linux. Обидві моделі вирішують ту саму задачу, але механізм їхньої роботи принципово відрізняється.
Реалізація у Python
Бібліотека asyncio у поєднанні з модулем selectors та системним викликом epoll є відповіддю на C10k у Python. Перевага асинхронної моделі над thread-per-connection на сучасному 64-бітному Linux полягає не у нездатності потоків масштабуватися (NPTL витримує 10 000+ потоків) і не у тому, що паралельне виконання можливе лише в асинхронній моделі (потоки забезпечують його так само). Реальні відмінності такі.
Низька вартість об'єкта-задачі. Порожня корутина у Python займає ~200 байт; разом із frame і локалами - близько 1-2 KB. Повна задача із транспортом, протоколом та буферами споживає десятки KB. Для порівняння, потік окрім 8 MB зарезервованого стека (резидентно - менше, див. вище) додає task_struct і kernel stack у ядрі - для 10 000 потоків це декілька сотень MB.
Перемикання у користувацькому просторі. Плануванням задач займається event loop, а не ядро (див. розбір вище), що знижує вартість кожного перемикання порівняно з kernel-mode scheduler.
Спрощена програмна модель. Один потік виконання означає відсутність race conditions між задачами; явні локи не потрібні, а стан зберігається у звичайних змінних.
import asyncio
async def handle(reader, writer):
data = await reader.read(1024) # non-blocking wait until epoll signals
writer.write(data)
await writer.drain()
writer.close()
async def main():
server = await asyncio.start_server(handle, "0.0.0.0", 8080)
async with server:
await server.serve_forever() # holds thousands of connections on a single thread
asyncio.run(main())
Актуальність C10k у сучасних умовах
У первинному формулюванні (обмеження операційної системи) проблема C10k сьогодні здебільшого не виникає. Сучасний ingress на nginx обслуговує сотні тисяч одночасних з'єднань без додаткових заходів. Поєднання NPTL, futex-ів та demand-paged стеків зняло обмеження моделі "потік на з'єднання", сформульовані у 1999 році. Архітектурні уроки C10k - event-driven I/O, мінімізація shared state, один потік виконання на ядро процесора - натомість стали стандартними практиками. На цих принципах побудовано nginx, Node.js та asyncio.
Наступним питанням після C10k є C10M - задача обслуговування 10 мільйонів з'єднань. На таких масштабах вузьким місцем стає вже не epoll, а весь мережевий стек ядра: алокація sk_buff на пакет, syscall-overhead, обробка переривань і softirq, лок-контеншн. Класичне формулювання Robert Graham - "the kernel isn't the solution, it's the problem". Рішення поділяються на три різні категорії за способом обходу цих витрат:
- kernel-bypass - повністю обминає мережевий стек ядра (
DPDK); - kernel fast-path - обробляє пакет у ядрі максимально рано, ще до алокації
sk_buff(XDP); - syscall amortization - зменшує кількість переходів user↔kernel через submission/completion-черги, залишаючись ядровим API (
io_uring).
Додатково застосовуються shared-nothing-архітектури з прив'язкою одного потоку до одного процесорного ядра без розділюваного стану. Для Python-розробника ця тема значною мірою екзотична, проте її розуміння корисне для проектування систем.
Links
Що використовує asyncio під капотом: selectors та epoll⚑
Summary
На Unix
asyncioevent loop (SelectorEventLoop) стоїть на модуліselectors, який мапиться на найефективніший доступний механізм readiness-нотифікацій:epollу Linux,kqueueу macOS/BSD. На Windows (з 3.8 за замовчуванням) працює інша модель -ProactorEventLoopповерх IOCP напряму через_overlapped, модульselectorsтут не задіяний (опційнийSelectorEventLoopна Windows використовує лишеSelectSelector). Це поєднання неблокуючого I/O з нотифікаціями від ядра дає найвищу пропускну здатність - жоден потік не блокується і немає busy-polling.
Чотири комбінації I/O, які теоретично можливі:
- blocking + sync - класичний
socket.recv(): потік стоїть, поки дані не прийдуть. - non-blocking + sync -
socket.setblocking(False)+ цикл, який сам опитує дескриптори (busy-polling, палить CPU). - blocking + async - блокуючий syscall, викликаний з-під асинхронного контексту: типова помилка "ти заблокував event loop" (наприклад, виклик синхронного
socket.recv()всередині корутини). У штатній архітектурі не має сенсу, але регулярно трапляється в проді як баг. - non-blocking + async - дескриптор у non-blocking режимі, а сповіщення про готовність приходить від ядра. Це те, що використовує
asyncio- максимальна пропускна здатність без вузьких місць.
Модуль selectors. У стандартній бібліотеці є selectors - тонкий об'єктний шар поверх системних викликів мультиплексування. Event loop не звертається до epoll напряму, а працює через абстракцію selectors.BaseSelector. Конкретна реалізація вибирається під ОС:
EpollSelector- Linux (epoll).KqueueSelector- macOS, BSD (kqueue).PollSelector- Unix-фолбек, де немаєepoll/kqueue(poll).SelectSelector- універсальний фолбек наselect; на Windows це єдиний доступний селектор. IOCP сюди не належить -ProactorEventLoopпрацює з ним позаselectors.DefaultSelector- alias на найефективніший доступний на поточній платформі.
import selectors
sel = selectors.DefaultSelector()
print(type(sel).__name__) # -> EpollSelector on Linux, KqueueSelector on macOS
Як це працює з epoll. Замість того щоб опитувати кожен сокет у циклі, event loop робить так:
- Реєструє файлові дескриптори в ядрі один раз через
epoll_ctl(EPOLL_CTL_ADD, fd, events). - Викликає
epoll_wait(timeout)- системний виклик блокує сам цикл (один потік) на час до timeout або до першої події. - Ядро будить виклик і повертає лише ті fd, на яких щось сталось (готові для читання/запису).
- Event loop підіймає відповідні корутини/колбеки і запускає їх до наступного
await.
import selectors, socket
sel = selectors.DefaultSelector()
server = socket.socket()
server.bind(("localhost", 0))
server.listen()
server.setblocking(False)
sel.register(server, selectors.EVENT_READ, data="accept")
while True:
events = sel.select(timeout=1.0) # epoll_wait under the hood
for key, mask in events:
# key.fileobj is ready, dispatch a callback
handle(key, mask)
Чому це найшвидше. Складаючи попарно властивості:
- Блокуючий sync I/O масштабується лише через потоки - кожен потік коштує пам'яті й контекст-свічів.
- Non-blocking sync без нотифікацій ядра - це busy loop, який вантажить CPU навіть коли немає подій.
asyncio+ epoll - один потік спить уepoll_waitпоки в системі нічого не відбувається, а коли ядро бачить подію - воно одразу пробуджує процес зі списком готових дескрипторів. CPU не витрачається на опитування, потоки не блокуються на окремих сокетах.
Обмеження старих механізмів (чому саме epoll):
select()- лінійний обхід усього набору fd на стороні ядра і userspace + жорсткий лімітFD_SETSIZE(зазвичай 1024).poll()- знімає ліміт, але все одно O(n) по кількості fd на кожен виклик.epoll()- O(log n) реєстрація черезepoll_ctl(ядро тримає множину fd у червоно-чорному дереві), O(готових_подій) на викликepoll_wait. Тримає 10k+ з'єднань без деградації (вирішення проблеми C10k).
Зв'язок з кодом asyncio. Стандартний SelectorEventLoop (Unix) тримає self._selector = selectors.DefaultSelector() і в кожній ітерації loop виконує приблизно event_list = self._selector.select(timeout) - саме тут процес "спить" в epoll, поки в системі немає роботи. На Windows з 3.8 за замовчуванням використовується ProactorEventLoop, який працює з IOCP напряму через _overlapped, повністю минаючи модуль selectors - інша модель ("ядро виконує операцію і повертає результат"), але семантика для прикладного коду та сама.
Links
Чому асинхронний код з await може виконуватись синхронно?⚑
await у коді НЕ запускає його конкурентно. Все, що робить await - віддає управління в event loop, щоб той мав можливість переключитися на сусідню корутину і, якщо там очікування закінчилося, продовжити виконання коду цієї сусідньої корутини.
Для конкурентного ("одночасного") запуску корутин (функцій, визначених async def) їх треба запускати не просто з await, а потрібно створювати завдання (asyncio.Task) — безпосередньо з asyncio.create_task(coro()) або за допомогою інших АРІ asyncio.
import asyncio
import time
async def delay(seconds: int) -> None:
print (f"delay ({seconds=}) started", flush=True)
await asyncio.sleep (seconds)
print (f"delay ({seconds=}) finished", flush=True)
async def main():
start_time = time.perf_counter()
await delay(1)
await delay(3)
await delay(2)
print(f"elapsed time: {time.perf_counter() - start_time:.1f} seconds")
asyncio.run(main()) # elapsed time: 6.0 seconds
Як запустити код конкурентно в asyncio?⚑
Існує 5 основних способів запустити код асинхронно:
- створення завдань з
asyncio.create_task(...)і потім їх очікування зawait asyncio.gather (...)asyncio.TaskGroupasyncio.as_completed(...)asyncio.wait(...)
Для чого використовується asyncio.create_task(...)?⚑
asyncio.create_task(...) використовується для запуску корутини як незалежного асинхронного завдання (task) у фоновому режимі. Це дозволяє корутині виконуватися паралельно з іншими частинами програми, не блокуючи поточний потік виконання.
Завдання не починає виконуватись відразу (тільки планується її виконання), а чекає, поки зустрінеться перший await, коли ми віддаємо управління з поточної асинхронної функції в event loop. Тільки тоді він отримає можливість запустити створені завдання.
Важливий моментом є те, що ми можемо запустити завдання, але не дочекатись його виконання. Щоб завдання точно повністю відпрацювало, треба явно чекати закінчення завдання з await task - важливо не забувати про це у такому сценарії запуску конкурентності.
Exception, що виникли у завданнях та необроблені в них не скасовують роботу інших завдань після виникнення виключення. Обробляти винятки потрібно на рядку await task.
Недоліки create_task
- деяка багатослівність - спочатку створити таски, потім почекати на їх виконання з
await - немає можливості обробляти результати завдань у міру їх виконання — у циклі з
awaitпорядок не за швидкістю виконання, а по порядку додавання завдань до списку
import asyncio
import time
async def delay(seconds: int) -> None:
print (f"delay ({seconds=}) started", flush=True)
await asyncio.sleep (seconds)
print (f"delay ({seconds=}) finished", flush=True)
async def main():
start_time = time.perf_counter()
first_task = asyncio.create_task(delay(1))
second_task = asyncio.create_task(delay(3))
third_task = asyncio.create_task(delay(2))
print("before first await", flush=True)
await first_task
print("after first_task", flush=True)
await second_task
print("after second_task", flush=True)
await third_task
print(f"elapsed time: {time.perf_counter() - start_time:.1f} seconds")
asyncio.run(main()) # elapsed time: 3.0 seconds
Task можна відміняти методом cancel().
import asyncio
import time
async def delay(seconds: int) -> None:
print(f"delay({seconds=}) started", flush=True)
await asyncio.sleep(seconds)
print(f"delay({seconds=}) finished", flush=True)
async def main():
start_time = time.perf_counter()
tasks = (
asyncio.create_task(delay(3), name="delay 3 sec"),
asyncio.create_task(delay(1), name="delay 1 sec"),
asyncio.create_task(delay(20000), name="delay 20000 sec")
)
for task in tasks:
if task.get_name() == "delay 20000 sec":
task.cancel()
else:
await task
print(f"elapsed time: {time.perf_counter() - start_time:.1f} seconds")
if __name__ == "__main__":
asyncio.run(main()) # elapsed time: 3.0 seconds, # ! not 20000
Для чого використовується asyncio.TaskGroup?⚑
asyncio.TaskGroup використовується для управління групою асинхронних задач, що виконуються одночасно. Ця структура дозволяє легко створювати, запускати і відстежувати кілька завдань, підтримуючи їх виконання в межах одного контексту, забезпечуючи автоматичне завершення всіх завдань, навіть якщо одне з них викликає помилку. Це новий механізм, введений у Python 3.11.
Всі задачі, що створені в межах одного TaskGroup, виконуються паралельно. Завдяки підтримці синтаксису async with, після виходу з контексту групи можна бути впевненим, що всі задачі або завершились, або були скасовані.
Якщо одна з задач у групі викидає необроблений виняток, інші задачі також зупиняють роботу - у них викликається asyncio.CancelledError.
TaskGroup зручно використовувати, коли треба виконати конкурентно кілька завдань, але якщо хоча б одне завершиться з винятком - зупинити всі інші
Це не повний аналог gather — оскільки gather вміє виконати всі завдання і повернути всі винятки, які виникли.
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
print(f'Task {name} completed after {delay} seconds')
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(task('A', 1))
tg.create_task(task('B', 2))
tg.create_task(task('C', 3))
print('All tasks created')
print('All tasks completed')
asyncio.run(main())
В цьому прикладі три задачі (A, B і C) запускаються одночасно, і TaskGroup забезпечує, що всі вони завершаться перед тим, як main() завершиться. Прекрасний спосіб організації асинхронного коду, правда ж? Щось ще в цьому напрямку?
Для чого використовується gather()?⚑
gather() - функція, яка призначена для виконання асинхронних задач паралельно та збору їх результатів. Вона приймає кілька кілька асинхронних функцій, огортає їх в завдання, якщо це потрібно, та очікує завершення їх виконання, збираючи результати від кожної задачі. Ми отримуємо результат всіх awaitables у тому ж порядку, в якому вони були передані.
Якщо виникає виняток, gather() миттєво поверне його на рядку await gather, однак на інші завдання це не вплине, вони продовжують виконання, але важко дістати результати. Якщо ж скасувати gather(), всі його awaitables, які ще не завершили своє виконання, також будуть скасовані.
На практиці краще використовувати gather(*coros, return_exceptions=True). При такому підході винятки повертаються з gather у результатах - всі результати в порядку *coros.
Недоліки
- немає можливості обробляти результати завдань по мірі їх виконання
- документація позиціонує
TaskGroupяк сучасний спосіб створювати завдання та чекати їх повного виконання - хочаTaskGroupінакше працює з винятками
import asyncio
async def task1():
await asyncio.sleep(1)
return "Task 1 done"
async def task2():
await asyncio.sleep(2)
return "Task 2 done"
async def main():
results = await asyncio.gather(task1(), task2())
print(results)
asyncio.run(main()) # ['Task 1 done', 'Task 2 done']
Для чого використовується wait_for(), as_completed(), wait() ?⚑
wait_for() - приймає два аргументи: один awaitable та затримку в секундах. Дозволяє очікувати завершення конкретної асинхронної задачі з обмеженням у часі. Якщо awaitable — це корутина, вона автоматично огортається в завдання. Якщо задача не завершиться протягом вказаного таймауту, генерується виняток asyncio.TimeoutError.
import asyncio
async def my_task():
await asyncio.sleep(2)
return "Task done"
async def main():
try:
result = await asyncio.wait_for(my_task(), timeout=1)
print(result)
except asyncio.TimeoutError:
print("Task did not complete within the specified time.")
asyncio.run(main()) # Task did not complete within the specified time.
В поєднанні з gather() - щойно закінчується затримка, внутрішнє завдання скасовується. Всі завдання в gather() також скасовуються
try:
result_f, result_g = await asyncio.wait_for(
asyncio.gather(f(), g()),
timeout=5.0
)
except asyncio.TimeoutError:
print("oops took longer than 5s!")
as_completed() - приймає ітерований об'єкт (наприклад, список, кортеж, сет), та повертає асинхронний ітератор, який генерує asyncio.Futures в порядку завершення виконання корутин.
Дає можливість обробляти результати корутин у міру їх виконання, а також дозволяє обробити всі завдання, які можуть бути оброблені, навіть після винятку в одному із завдань.
import asyncio
async def task1():
await asyncio.sleep(2)
return "Task 1 done"
async def task2():
await asyncio.sleep(1)
return "Task 2 done"
async def main():
tasks = [task1(), task2()]
for task in asyncio.as_completed(tasks):
result = await task
print(result)
asyncio.run(main()) # Task 2 done, Task 1 done
wait() - функція, яка очікує завершення асинхронних задач та повертає кортеж, який містить два сети: задач, які завершили виконання, і ті, що ще в очікуванні. Тобто вона не повертає результати - відповідальність за обробку результату лежить на розробнику.
Можна передати затримку (timeout), після якої wait() припинить виконання. Але на відміну від gather(), з awaitables нічого не відбувається, коли затримка спливає. Функція просто завершує виконання та розподіляє завдання на виконані та ті, що ще в очікуванні.
Можна зробити так, аби wait() не чекав виконання всіх awaitables, за допомогою аргументу return_when. Автоматично цей аргумент приймає значення asyncio.ALL_COMPLETED. Можна змінити значення на asyncio.FIRST_EXCEPTION, яке очікує завершення всіх awaitables, якщо лише якесь з них не спровокує виняток. А от з asyncio.FIRST_COMPLETED функція завершує виконання одразу, коли якийсь awaitables завершив виконання.
import asyncio
async def task1():
await asyncio.sleep(2)
return "Task 1 done"
async def task2():
await asyncio.sleep(1)
return "Task 2 done"
async def main():
tasks = [asyncio.create_task(task1()), asyncio.create_task(task2())]
completed_tasks, pending_tasks = await asyncio.wait(tasks, timeout=1.5)
for task in completed_tasks:
print(task.result())
asyncio.run(main()) # Task 2 done
Links
Для чого використовується asyncio.to_thread⚑
asyncio.to_thread використовується в Python для виконання блокуючих IO операцій у фоновому потоці під час асинхронного виконання коду. Це дозволяє уникнути блокування основного асинхронного циклу подій під час виконання операцій, які займають багато часу або можуть заблокувати інші завдання.
Виклик asyncio.to_thread виконає передану функцію в іншому потоці з використанням concurrent.futures.ThreadPoolExecutor. Це дозволяє не блокувати асинхронний цикл подій під час очікування завершення блокуючої операції. asyncio.to_thread поверне корутину, яку можна await для отримання результату виконання функції.
import asyncio
import time
def blocking_task():
time.sleep(2)
return "Completed"
async def main():
print("Start blocking task")
result = await asyncio.to_thread(blocking_task)
print(result)
asyncio.run(main())
Робота з файловою системою через ThreadPool
Той самий принцип лежить в основі бібліотек async-роботи з файлами (aiofiles тощо). На Linux/macOS/Windows регулярні файли не підтримують неблокуючий режим: будь-який read/write блокує потік до завершення дискової операції. Тому aiofiles не виконує "async syscall" - він обгортає блокуючий виклик у ThreadPoolExecutor так само, як asyncio.to_thread. Event loop не блокується, але реальний syscall усе одно паркує допоміжний потік. Деталі - у files_and_io.md в розділі "В чому різниця між блокуючим і неблокуючим введенням-виведенням".
Якщо потрібно виконати обчислення у процесі (наприклад, для більш інтенсивних завдань), можна використовувати concurrent.futures.ProcessPoolExecutor разом із run_in_executor.
import asyncio
from concurrent.futures import ProcessPoolExecutor
def heavy_computation():
result = sum(i * i for i in range(10**6))
return result
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool: # Use a ProcessPoolExecutor to avoid blocking the loop
result = await loop.run_in_executor(pool, heavy_computation)
print(result)
asyncio.run(main())
CPU-bound через to_thread: GIL contention
asyncio.to_thread часто використовують і для CPU-bound операцій (AES-шифрування, хешування, компресія) з метою не блокувати event loop. На малих обсягах це працює, але на масштабі вступає в дію механіка GIL:
- Через GIL у CPython лише один потік виконує Python-байткод одночасно.
- За замовчуванням інтерпретатор перемикає GIL приблизно кожні 5 мс (
sys.getswitchinterval()повертає0.005секунди починаючи з Python 3.2, раніше час вимірювали кількістю байткодів черезsys.setcheckinterval). - При багатьох одночасних
to_thread-викликах потоки конкурують з потоком, який виконує event loop, за GIL. Кожне перемикання потягує context switch на рівні OS-планувальника; чим більше runnable-потоків, тим менший слайс CPU отримує event loop.
Це не означає, що to_thread не можна використовувати для CPU-bound операцій. Один невеликий виклик (мікросекунди CPU-часу) - прийнятний: за час одного перемикання GIL операція вже завершиться. Але тисячі одночасних CPU-bound викликів через to_thread деградують латентність event loop, а виграш над синхронним рішенням може зникнути або стати від'ємним (накладні витрати на context switch перевищують економію від паралелізації, особливо коли GIL не дозволяє реальної паралельності).
Правильна евристика:
- IO-bound блокуючий виклик (файли, legacy-DB-драйвер, syscall'и):
asyncio.to_thread- канонічний інструмент. - CPU-bound операція, поодинокий виклик, обмежена кількість одночасних:
asyncio.to_threadприйнятний, поки розмірdefault_executorне перевищується (defaultThreadPoolExecutormax_workers-min(32, os.cpu_count() + 4)у Python 3.8-3.12; з Python 3.13 -min(32, (os.process_cpu_count() or 1) + 4)). - CPU-bound операція на масштабі (тисячі одночасних викликів):
concurrent.futures.ProcessPoolExecutorчерезloop.run_in_executor(pool, ...), щоб обчислення йшли поза GIL і поза процесом event loop.
Це межа, після якої перенесення в потік не масштабується; виміряти треба конкретно під своє навантаження, не довіряючи інтуїції.
Links
- Python docs:
sys.getswitchinterval()- default 0.005s з Python 3.2 - Python 3.2 What's New: New GIL - перехід від bytecode-counter до timeslice-based GIL switch (Antoine Pitrou's "newgil")
asyncio.create_subprocess_exec для зовнішніх процесів⚑
Summary
Коли тяжку роботу робить зовнішня програма (FFmpeg, ImageMagick,
pdftotext,git), правильний інструмент - неto_threadнавколоsubprocess.run, а нативний asyncio-API:asyncio.create_subprocess_exec(...)повертаєProcess-обʼєкт, який інтегрований з event loop через SIGCHLD + неблокуючі pipe'и. Event loop не паркує допоміжний потік - чекає завершення штатними механізмами OS.
Базовий патерн
import asyncio
async def transcode(src: str, dst: str) -> None:
proc = await asyncio.create_subprocess_exec(
"ffmpeg", "-i", src, "-c:v", "libx264", dst,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate() # Awaits process exit
if proc.returncode != 0:
raise RuntimeError(f"ffmpeg failed: {stderr.decode()}")
await proc.communicate() повертає кортеж (stdout_bytes, stderr_bytes) і чекає завершення процесу. На відміну від subprocess.run(), який блокує потік на waitpid(), asyncio-варіант реєструє SIGCHLD-обробник і повертає управління event loop'у; інші корутини виконуються паралельно з зовнішнім процесом.
exec vs shell
create_subprocess_exec(*args)- передає аргументи як список, без shell. Безпечний дефолт.create_subprocess_shell(cmd)- запускаєcmdчерез/bin/sh -c. Зручно для пайплайнів ("foo | bar"), але вразливо до shell-ін'єкції, якщо у команді є user-input. Не передавати неперевірені рядки.
# Safe
proc = await asyncio.create_subprocess_exec("ls", "-la", user_dir)
# Risky if user_dir contains "; rm -rf /"
proc = await asyncio.create_subprocess_shell(f"ls -la {user_dir}")
Стрімінг великих output'ів
communicate() буферизує весь stdout/stderr у пам'яті. Для багатогігабайтних output'ів - читати через proc.stdout.readline() / read(n) у циклі або писати у файл напряму через stdout=open("out.bin", "wb").
proc = await asyncio.create_subprocess_exec(
"ffmpeg", "-i", src, "-progress", "pipe:1", "-c:v", "libx264", dst,
stdout=asyncio.subprocess.PIPE,
)
async for line in proc.stdout: # Stream progress lines
if line.startswith(b"out_time="):
print(line.decode().strip())
await proc.wait()
create_subprocess_exec vs to_thread(subprocess.run, ...)
Технічно це працює, але:
subprocess.runблокує OS-потік наwaitpid()- той самий "трюк" зaiofiles-обгорткою над thread pool'ом. Потік ThreadPoolExecutor паркується на час життя зовнішнього процесу.create_subprocess_execробить це нативно через event loop без потоку: один SIGCHLD-обробник на всі активні subprocess'и.- На сотнях одночасних зовнішніх процесів
to_thread-підхід вичерпаєdefault_executor(max_workersmin(32, cpu+4)); asyncio-варіант масштабується скільки дозволить ОС (ulimit -n,kernel.pid_max).
Завершення
proc.terminate() шле SIGTERM, proc.kill() - SIGKILL. Канонічний guard з тайм-аутом:
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
except asyncio.TimeoutError:
proc.kill()
await proc.wait() # Reap to avoid zombie
raise
Links
Що таке aiohttp⚑
aiohttp — це асинхронна бібліотека Python для роботи з HTTP, яка забезпечує клієнтську та серверну функціональність. Підходить для побудови асинхронних додатків, таких як скраперів, REST API серверів та інших мережевих сервісів.
Основні характеристики та можливості aiohttp
- Використовує асинхронний ввід/вивід на базі
asyncio, що дозволяє ефективно обробляти одночасні запити. - Підтримує створення асинхронних HTTP-клієнтів для виконання запитів, отримання відповідей, роботи з JSON, заголовками та файлами.
- Дозволяє створювати легковажні та швидкі асинхронні веб-сервери з налаштованими маршрутами та обробниками.
- Підтримує WebSocket для побудови двонаправленого зв’язку між клієнтом і сервером.
- Працює з сучасним синтаксисом Python (
async/await), що робить код більш читабельним і простим. - Забезпечує гнучке управління сесіями, кукі, редиректами та тайм-аутами.
- Підтримує middlewares для обробки запитів/відповідей на різних етапах.
import aiohttp
import asyncio
async def fetch(url): # Asynchronous function to fetch data from a URL
async with aiohttp.ClientSession() as session: # Create a client session
async with session.get(url) as response: # Perform GET request
return await response.text() # Await and return response text
async def main(): # Main coroutine to execute the fetch function
url = "https://example.com"
html = await fetch(url) # Fetch the URL content
print(html) # Print the response
asyncio.run(main()) # Run the main coroutine
Типові помилки при роботі з asyncio⚑
- Спроба виконання корутин шляхом їхнього виклику. При такому виклику корутини її тіло не виконається. Замість цього буде створено об'єкт корутини. Потім можна зачекати завершення роботи цього об'єкта в середовищі виконання asyncio, тобто в циклі подій. Запустити цикл подій для виконання корутини можна, скориставшись функцією asyncio.run().
- Корутині не дозволяють виконатися в циклі подій. Якщо виконання не було заплановано в циклі подій, виникне помилка під час виконання.
- Використання низькорівневого API модуля Asyncio - низькорівневий API призначений в першу чергу для творців фреймворків і бібліотек.
- Занадто ранній вихід із головної корутини. Якщо завершиться головна корутина, програма також завершиться, навіть якщо інші корутини ще не завершили своє виконання. Для уникнення цього, можна застосовувати
asyncio.wait(), щоб дочекатися завершення усіх задач перед завершенням програми. - Гонки стану (Race Conditions) - виникають, коли дві або більше корутини намагаються одночасно змінювати спільні дані без належного контролю.
- Взаємне блокування (Deadlocks) - ситуації, коли корутини чекають на ресурси, які утримують одна одну, і ні одна не може продовжити виконання.
- Невірне використання примітивів синхронізації. Неправильне використання
asyncio.Lock,asyncio.Eventта інших примітивів може призвести до неправильної синхронізації корутин. - Неправильна обробка винятків - неуспішна обробка винятків у корутинах може призвести до непередбачуваної поведінки програми.
Чому код не виконується паралельно⚑
import asyncio
import random
async def fetch(i):
await asyncio.sleep(random.random())
print(f"Fetched {i}")
async def main():
for i in range(5):
await fetch(i)
asyncio.run(main())
Пояснення
Тут використовується послідовний цикл з await. Кожен виклик fetch(i) запускається і чекає завершення перед тим, як перейти до наступного. Тобто фактично ми отримуємо послідовне виконання: спочатку fetch(0), потім fetch(1) і так далі.
Асинхронність є, але конкурентності немає — ми не запускаємо кілька задач одночасно.
Паралельне виконання
async def main():
tasks = [asyncio.create_task(fetch(i)) for i in range(5)]
await asyncio.gather(*tasks)
Щоб функції виконувались одночасно, їх потрібно перетворити на завдання: asyncio.create_task() реєструє корутину в планувальнику та повертає об'єкт завдання, який почне виконуватися "у фоні".
await asyncio.gather(...) вже не блокує виконання крок за кроком — він очікує на завершення всіх завдань і дозволяє їм працювати паралельно на рівні подієвого циклу.
Скільки потоків та процесів працює під час асинхронного виконання коду⚑
У асинхронному програмуванні Python використовується один процес, який має один основний потік виконання. Цей основний потік взаємодіє з event loop для управління асинхронними задачами, не створюючи додаткових потоків чи процесів. Такий підхід спрощує управління асинхронним кодом і уникнення проблем, пов'язаних зі синхронізацією ресурсів у багатопроцесових чи багатопотокових програмах.
eventloop дуже швидкий, і він більшу частину часу чекає на syscall (системний виклик), тому немає змісту заводити багато потоків з eventloop в кожному. Ningx та node.js також мають один eventloop. Але в будь-якій асинхронній системі окремо є Thread Pool, в який складаються завдання, які виконуються не миттєво.
Як event loop визначає, якій корутині передати управління?⚑
Між event loop і селекторами (select/poll/epoll) є прямий зв'язок - саме через них loop вирішує, кому передати управління далі.
Послідовність:
- Корутина робить
await- призупиняється, повертає управління в event loop. Якщо цеawaitна I/O (наприклад,await socket.recv()), корутина прив'язується до файлового дескриптора, який ще не готовий. - Event loop реєструє цей дескриптор у селекторі (
selectors.DefaultSelector()) - підписується на події "готовий до читання/запису". - Event loop викликає
selector.select(timeout)і чекає сигналу: "які з зареєстрованих дескрипторів готові". - Селектор повертає події - наприклад, сокет №1 готовий до читання → корутина, що чекала на нього, може бути відновлена.
- Event loop ставить цю корутину в чергу готових задач. У
Futureкожної корутини виставляється результат - він вважається завершеним. - Event loop бере з черги першу готову задачу і продовжує її з місця останнього
await.
Важливо:
- Event loop сам нічого не знає про "майбутнє" корутин - він лише реагує на сигнали селекторів.
- Корутина не вибирається "просто так" - лише якщо вона була додана в чергу (
call_soon()) або селектор повідомив про готовність ресурсу (сокет, таймер).
Чому асинхронний сервіс може деградувати при великих обсягах даних?⚑
Summary
Async-сервіс деградує через завантаження всього в пам'ять, блокуючі SQL-запити, CPU-обробку в event loop, нестачу з'єднань і відсутність стрімінгу.
Асинхронність не захищає від типових проблем - це лише інша модель виконання.
1. Нестача оперативної пам'яті
Виклик типу await session.execute(select(...)) або await cursor.fetchall() завантажує весь результат у RAM. Якщо це сотні тисяч рядків - отримуємо зростання споживання пам'яті, свап, OOM, довгі паузи GC.
2. Блокування на стороні БД
Важка вибірка з JOIN, GROUP BY, ORDER BY під навантаженням змушує базу:
- використовувати тимчасові таблиці;
- блокувати ресурси;
- довго тримати з'єднання відкритим.
Це знижує продуктивність не тільки поточного запиту, а й усієї БД.
3. Перевантаження event loop
Async ефективний для I/O-bound (багато дрібних запитів). Якщо в корутині важка обробка результатів у Python (CPU-bound) або довге утримання управління без await - всі інші корутини чекають.
4. Серіалізація великих об'ємів
Pydantic-моделі (особливо до v2) роблять рекурсивну валідацію і серіалізацію. Повернення 10 000 об'єктів через FastAPI:
- валідація кожного поля;
- перетворення
datetime,Decimal,UUIDу JSON; - виклик
.dict()/.json()для кожного об'єкта.
Це сильно вантажить CPU й уповільнює відповідь.
5. Немає стрімінгу даних
Замість fetchall() - використовувати курсор з ітерацією, або chunked-вивантаження через LIMIT + OFFSET чи keyset-pagination.
Які проблеми є у асинхронного коду?⚑
1. Складність дебагу і трасування
Стек викликів у async-коді часто фрагментований. Винятки можуть "губитися", особливо якщо не await-ити корутини. Забутий await дає "висячі" задачі без помилок.
2. Не можна використовувати блокуючий код
Async погано працює з блокуючими викликами: time.sleep(), requests.get(), звичайний open(). Вони блокують event loop - і весь застосунок "зависає". Для блокуючих операцій - asyncio.to_thread() або loop.run_in_executor().
3. Сумісність зі сторонніми бібліотеками
Не всі бібліотеки підтримують asyncio: для HTTP потрібен aiohttp/httpx, не requests; для SQL - asyncpg, aiomysql, databases, SQLModel. Інколи доводиться обгортати синхронні дзвінки в executor - це сповільнює і ускладнює код.
4. Обмеження GIL
Асинхронність не обходить GIL. Вона не прискорює CPU-bound задачі. Для них потрібен multiprocessing або ProcessPoolExecutor.
5. Складнощі при тестуванні
Async-функції запускаються через asyncio.run() або тестові фреймворки (pytest-asyncio).
6. Перевантаження архітектури
Передчасний перехід на async збільшує техборг. Async часто вимагає переписати весь ланцюжок викликів - він "вірусний".
Проблеми з логуванням в асинхронному коді⚑
Стандартний logging без урахування особливостей asyncio може давати кілька проблем:
1. Блокування event loop
logging.info(), logging.error() виконуються синхронно навіть в async-коді. При інтенсивному логуванні запис у файл/мережу блокує event loop і дає затримки.
2. Втрата контексту
При одночасному логуванні з декількох корутин повідомлення можуть переплітатися - рядки з різних задач змішуються в логах, особливо в StreamHandler чи файлі.
3. Немає підтримки await
Стандартний logging не підтримує await. Записи в файл/мережу - синхронні.
Рішення:
- Асинхронні логери -
aiologgerз async-сумісними методами. QueueHandler+ окремий потік для запису - щоб не блокувати event loop.- Контекст у записах -
task_id,user_id,correlation_idдля відстеження. - Лінива підстановка форматтера:
logging.info("User %s logged in", user_id)замість f-string.
ContextVar для request-scoped стану⚑
Summary
contextvars.ContextVar- стандартний механізм Python для request-scoped стану в асинхронних застосунках. На відміну відthreading.local, ContextVar коректно копіюється при створенні нової таски і не "змішує" значення між запитами, що виконуються в одному потоці event loop.
Обмеження threading.local в asyncio
threading.local прив'язує значення до OS-потоку. У asyncio один потік event loop одночасно обслуговує тисячі задач (корутин). Виставлене з однієї задачі значення видно всім іншим задачам, що виконуються тим самим потоком, - це не request-scoping, а global state з ілюзією ізоляції.
ContextVar (PEP 567, з Python 3.7) натомість прив'язує значення до поточного контексту виконання. При створенні нової asyncio.Task контекст копіюється: задача отримує знімок поточних значень ContextVar, але її подальші зміни не видно іншим задачам.
Принцип роботи
import asyncio
from contextvars import ContextVar
tenant_id_ctx: ContextVar[str | None] = ContextVar("tenant_id", default=None)
async def report() -> str:
return tenant_id_ctx.get() or "<none>"
async def per_tenant_handler(tenant: str) -> None:
tenant_id_ctx.set(tenant)
print(await report()) # tenant
async def main() -> None:
await asyncio.gather(
per_tenant_handler("A"),
per_tenant_handler("B"),
)
print(await report()) # <none> - main context unchanged
Кожен виклик gather створює нову Task для корутини. Кожна Task отримує копію контексту; set() всередині однієї Task не змінює контекст в інших Task або у викликача.
.set(value) повертає Token, через який можна скасувати зміну (.reset(token)) - корисно для middleware-патернів, де треба явно відновити попереднє значення після завершення обробки.
Типове застосування
Поширений use case у вебзастосунках - request-scoped метадані: tenant_id, user_id, request_id/correlation_id, locale, trace context. Middleware кладе значення в ContextVar на вході запиту; нижчі шари (repository, logger, metrics) читають без передачі через сигнатури всіх функцій.
from collections.abc import Awaitable, Callable
from fastapi import Request, Response
@app.middleware("http")
async def tenant_middleware(
request: Request,
call_next: Callable[[Request], Awaitable[Response]],
) -> Response:
tenant_id = extract_tenant_id(request)
token = tenant_id_ctx.set(tenant_id)
try:
return await call_next(request)
finally:
tenant_id_ctx.reset(token)
Trade-off: implicit dependency
ContextVar створює неявну залежність: код читає значення, яке хтось виставив "десь раніше". Це робить тестування і повторне використання важчим - service, що читає tenant_id з ContextVar, неможливо викликати з cron-задачі без налаштування контексту.
Канонічне правило (узгоджене з Service Layer патерном у architecture/architecture_patterns.md): service приймає бізнес-параметри (tenant_id, user_id) явно через сигнатуру. ContextVar - тільки для cross-cutting метаданих, які не є частиною бізнес-контракту: tracing-ідентифікатори, locale для повідомлень про помилки, debug-прапорці.
Сумісність з потоками і to_thread
asyncio.to_thread копіює поточний контекст у потік: значення ContextVar, виставлені в корутині, видно функції, що виконується через to_thread. Зміни всередині to_thread не повертаються назад у корутину - копія залишається у потоковому контексті.
Links
- PEP 567 - Context Variables - дизайн, motivation, семантика копіювання при створенні Task
- Python docs:
contextvars- API і приклади