Асинхронные возможности Python
Асинхронность:
- Прямая (параллелизм) — в языке нет
- С последовательной активацией (обратные вызовы функций, callbacks) — полностью моделируется имеющимся синтаксисом
- Сопрограммная (внутри сопрограммы есть синхронные участки, выполнение их произвольно) — вот!
Модель
Предполагается, что весь предлагаемый код вы запускаете и смотрите на результат; без этого понять намного сложнее, if even possible ☺
Как работает yield from (повторение)
На время yield from код генератора task() логически не исполняется, можно считать, что на это время его замещает subr()
Ловля return из генератора с помощью yield from (два способа)
Оператор return в генераторе откладывает свой параметр в поле .value исключения StopIteration
А в конструкции yield from … это значение приезжает прямо так!
Передача параметра в генератор с помощью .send() (повторение):
Особенность: самый первый .send() должен быть генератор.send(None) (или, что то же самое, next(генератор), потому что в синтаксисе нет способа передать какое-то значение в начало генератора, а не в yield.
initial — это параметр генератор-функции, он передаётся в момент создания генератора, а не при его проходе
Мы договорились считать этот первый next() запуском генератора.
Куда происходит .send() в случае yield from?
Ничего неожиданного: .send() попадает в тот итератор, который сейчас yield-ит
Внимательно посмотрим, куда что send-илось…
Асинхронность как произвольное исполнение частей кода между yield-ами
Понятие синхронного фрагмента — непрерывно выполняемого кода между yield-ами (а также стартом и return-ом)
Понятие образующего цикла (main loop)
Тот же пример, но с двумя асинхронно выполняющимися задачами:
1 def subr(n): 2 x = yield f"({n}) Wait for x" 3 y = yield f"({n}) Wait for y ({x=})" 4 return x * y 5 6 def task(n): 7 while True: 8 value = yield from subr(n) 9 _ = yield f"[{n}]: {value}" 10 11 cores = task(0), task(1) 12 print(next(cores[0]), next(cores[1]), sep="\n") 13 for i in range(20): 14 print(cores[not i % 3].send(i))
Здесь из образующего цикла поступает поток целых чисел, subr() их попарно умножает, а две задачи складывают эти произведения
Очередное число попадает в subr() выбранной задачи, а выбор задач делает образующий цикл
Синхронные фрагменты из task[0] выполняются в два раза чаще синхронных фрагментов из task[1]
- Можно попробовать разобраться, что с чем складывалось…
- Более сложный пример: три конечных задачи с разным количеством синхронных фрагментов
1 from random import randint 2 3 def subr(): 4 x = yield 5 y = yield 6 return x * y 7 8 def task(num): 9 res = 0 10 for i in range(num): 11 res += yield from subr() 12 return res 13 14 def loop(*tasks): 15 queue, result = list(tasks), [] 16 print("Start:", *queue, sep="\n\t") 17 for task in tasks: 18 next(task) 19 while queue: 20 task = queue.pop(0) 21 try: 22 task.send(randint(1, 9)) 23 except StopIteration as ret: 24 result.append((task, ret.value)) 25 else: 26 queue.append(task) 27 return result 28 29 print("Done:", *loop(task(10), task(3), task(5)), sep="\n\t")
- Образующий цикл вынесен в отдельную функцию и стал сложнее. В нём генерируется непрерывный поток случайных целых и отдаётся поштучно на обработку очередному заданию. Если задание закончилось, запоминается его результат, а если нет — ставится в конец очереди.
Для реализации этой логики пришлось снова «вытащить» явную обработку StopIteration
Значения, возвращаемые yield, при этом не используются вообще: yield служит только для разметки синхронных фрагментов
Если ещё усложнить логику образующего цикла, мы сможем управлять его поведением с помощью возвращаемых yield значений:
1 from random import randint, choice 2 from string import ascii_uppercase 3 from collections import deque 4 5 def subr(): 6 return (yield int) * (yield str) 7 8 def task(num): 9 res = "" 10 for i in range(num): 11 res += yield from subr() 12 return res 13 14 def loop(*tasks): 15 queue, result = deque((task, None) for task in tasks), [] 16 print("Start:", *queue, sep="\n\t") 17 while queue: 18 task, request = queue.popleft() 19 if request is int: 20 data = randint(1, 4) 21 elif request is str: 22 data = choice(ascii_uppercase) 23 else: 24 data = request 25 try: 26 request = task.send(data) 27 except StopIteration as ret: 28 result.append((task, ret.value)) 29 task.close() 30 else: 31 queue.append((task, request)) 32 return result 33 34 print("Done:", *loop(task(10), task(3), task(5)), sep="\n\t")
subr() возвращает тип параметра, который она хотела бы получить в следующем yield-е
Этот тип хранится в очереди вместе с заданием, чей subr() запросил данный параметр
- Образующий цикл генерирует параметр сообразно типу
А ещё мы храним очередь в очереди, а не в списке, не надо привыкать к плохому!
Можно и дальше усложнять, но и так уже непросто!
Ещё модели
Цикл событий: образующий цикл получает откуда-то «события», определяет, кто их должен обрабатывать и вызывает функции-обработчики с параметром обработчик(событие) (возможно, не функции, а генераторы обработчик.send(событие), не слишком важно).
- Цикл обратных вызовов (callback-ов): частный случай того же самого: каждый обработчик «регистрируется» — по заранее определённому протоколу указывает, в каких случаях его надо вызывать (это и есть событие), а образующий цикл при наступлении события вызывает все обработчики, которые на нём зарегистрировались (опять-таки, можно организовать в виде функций, а можно в виде генераторов)
Цикл с future: унификация управления образующим циклом
future — это генератор из двух синхронных сегментов
Настройка и yield в образующий цикл
return возвращаемого значения
Кроме того, в future есть поле готовности / результата
- Алгоритм работы:
- Генератор-сервис заводит неготовую фьючу в данном образующем цикле
Генератор-пользователь делает yield from фьюча
- Фьюча выпадает в образующий цикл
- В какой-то момент генератор-сервис выставляет в фьюче готовность / результат
- На этом основании образующий цикл возвращает управление фьюче (во второй сегмент)
- А та возвращает значение генератору-пользователю
- Фактически это частный случай обратных вызовов
- … более сложная логика (например, приоритизация событий) …
Синтаксис Async
async def + return ≈ генератор — задание сопрограммы
await ≈ yield from — вызов сопрограммы
async def + yield — это именно то, чем кажется: генераторы, про которые сразу известно, что они асинхронные:
Их можно проходить async for (причём в конструкторах вида [… async for i in асинхронный-гененратор …] тоже)
Перепишем предыдущий пример на async
@types.coroutine: низкоуровневая сопрограмма, которая может делать и return значение, и yield, то есть напрямую обращаться к образующему циклу
1 from random import randint, choice 2 from string import ascii_uppercase 3 from types import coroutine 4 from collections import deque 5 6 @coroutine 7 def subr(): 8 return (yield int) * (yield str) 9 10 async def task(num): 11 res = "" 12 for i in range(num): 13 res += await subr() 14 return res 15 16 def loop(*tasks): 17 queue, result = deque((task, None) for task in tasks), [] 18 print("Start:", *queue, sep="\n\t") 19 while queue: 20 task, request = queue.popleft() 21 if request is int: 22 data = randint(1, 4) 23 elif request is str: 24 data = choice(ascii_uppercase) 25 else: 26 data = request 27 try: 28 request = task.send(data) 29 except StopIteration as ret: 30 result.append((task, ret.value)) 31 task.close() 32 else: 33 queue.append((task, request)) 34 return result 35 36 print("Done:", *loop(task(10), task(3), task(5)), sep="\n\t")
Наш subr() использует прямое управление образующим циклом с помощью yield
В готовом инструментарии это практически никогда не нужно: и образующий цикл, и инструменты управления им должны входить в такой инструментарий
Asyncio
- Самое сложное — это логика образующего цикла
Самое ненужное — это логика образующего цикла (достаточно знать, как он работает, а не что делает)
⇒
- Запрограммируем образующий цикл заранее, насуём туда инструментов
Упростим протокол управления до одного понятия — Future
- Обмажем протокол верхним уровнем (задания, события, очереди и т. п.)
До такой степени, что ни одна из наших сопрограмм не делает yield (если это не асинхронный генератор)
(asyncio specific) обмажем огромным количеством применений IRL
Основные понятия:
Mainloop — образующий цикл. Полностью под капотом, мы его не видим.
Task — асинхронное, сопрограмма, поставленная в управление образующим циклом
1 import asyncio 2 from time import strftime 3 4 async def late(delay, msg): 5 await asyncio.sleep(delay) 6 print(msg) 7 8 async def main(): 9 print(f"> {strftime('%X')}") 10 await late(1, "One") 11 print(f"> {strftime('%X')}") 12 await late(2, "Two") 13 print(f"> {strftime('%X')}") 14 15 task3 = asyncio.create_task(late(3, "Three")) 16 task4 = asyncio.create_task(late(4, "Four")) 17 await(task3) 18 print(f"> {strftime('%X')}") 19 await(task4) 20 print(f"> {strftime('%X')}") 21 22 asyncio.run(main())
asyncio.run(main()) — запуск «приложения» main() в образующем цикле asyncio()
- «приложение» asyncio — корутина, который заполняет очередь mainloop-а и немножко командует им
Если просто написать await — корутина «просто запустится», в чём асинхроннотсь, непонятно (даже если она и выходила в mainloop)
В примере первая корутина спит секунду, а вторая — после этого ещё две
Если написать create_task(корутина), корутина регистрируется в mainloop-е, а возвращется нечто вроде фьючи — задание
await(здадание) запускает его
В примере ещё две корутины планируются одновременно, первая из них спит три секунды, а вторая — четыре, так что отрабатывает через секунду после первой
asyncio.sleep(тайм-аут) — это команда mainlop-у «верни мне управление после тайм-аута»
- Чуть ли не единственная команда asyncio-шному mainlop-у на поверхности
Gather — атомарная операция create_task() / await над несколькими корутинами
- Тут всё понятно, запустились, повисели сколько сказано, завершились
(Python 3.11) группы заданий —запуск в виде контекстного менеджера
1 import asyncio 2 3 async def late(delay, msg): 4 await asyncio.sleep(delay) 5 print(msg) 6 return delay 7 8 async def main(): 9 async with asyncio.TaskGroup() as tg: 10 tg.create_task(late(3, "A")) 11 tg.create_task(late(1, "B")) 12 tg.create_task(late(2, "C")) 13 print("Done") 14 15 asyncio.run(main())
Asyncio — это:
Асинхронное выполнение фрагментов кода между yield-ами; порядок определяется образующим циклом и намеренно недетерминорован
⇒ возможны ситуации гонок, взаимоблокировки и прочие прелести; необходима синхронизация
- Один поток вычислений
⇒ нет ситуаций одновременного атомарного доступа к ресурсу (неатомарный одновременный доступ, то есть длящийся более одного фрагмента, разумеется, есть; требуются семафоры и mutex-ы)
⇒ нельзя надолго (особенно — неопределённо надолго) оставаться в одном фрагменте (висеть в горячем цикле, синхронном вводе и т. п.): пока сопрограмма не передала управление образующему циклу, никакие другие задания не выполняются
- Простая модель асинхронности с полностью скрытым образующим циклом и фьючей в качестве инструмента синхронизации
Высокоуровневое API (введение)
Собственно фьюча, но она считается низкоуровневым примитивом (The rule of thumb is to never expose Future objects in user-facing APIs)
Высокоуровневый — например, события
1 async def waiter(name, event): 2 print(f'{name} waits for {event}…') 3 await event.wait() 4 print(f'…{name} got it!') 5 6 async def eventer(wait, event): 7 print(f"Emitting {event} in {wait} seconds") 8 await asyncio.sleep(wait) 9 print(f"Emitting {event}…") 10 event.set() 11 12 async def main(): 13 event = asyncio.Event() 14 await asyncio.gather( 15 waiter("One", event), 16 waiter("Two", event), 17 eventer(1, event)) 18 19 asyncio.run(main())
…или барьеры
- …
1 async def ham(queue, size): 2 for i in range(size): 3 await asyncio.sleep(1) 4 res = await queue.get() 5 print(f"\tGot {res}") 6 7 async def spam(wait, queue): 8 for i in range(6): 9 await asyncio.sleep(wait) 10 val = f"{wait}:{i}" 11 await queue.put(val) 12 print(f"Put {val}") 13 14 async def main(): 15 queue = asyncio.Queue() 16 await asyncio.gather( 17 ham(queue, 12), 18 spam(0.4, queue), 19 spam(1.6, queue)) 20 21 asyncio.run(main())
- Есть и приоритетные очереди
И толстый-толстый слой шоколада!
Параллелизм (внешний, следите за тредобезопасностью или не используйте треды)
Изменение логики работы mainloop (aka Policies)
- Сеть (I/O, IPC и всё остальное), сигналы
Потоки (над этим всем)
В частности, TCP сервер (аналог netcat -l)
1 import asyncio 2 3 async def echo(reader, writer): 4 while data := await reader.readline(): 5 writer.write(data.swapcase()) 6 writer.close() 7 await writer.wait_closed() 8 9 async def main(): 10 server = await asyncio.start_server(echo, '0.0.0.0', 1337) 11 async with server: 12 await server.serve_forever() 13 14 asyncio.run(main())
- Может принимать произвольное количество соединений
Ещё раз: это не параллелизм!
- ⇒ (ещё раз) не висеть внутри сопрограммы в горячих циклах / синхронном вводе / whatever alike
Вброс/перехват исключений (по аналогии с generator.throw()
- …
Дикая туча модулей на основе asyncio
Д/З
Попробовать прочитать всю документацию и прощёлкать всё, до чего дотянетесь.
EJudge: MobQueue 'В очередь!'
(Фактически это упражнение). Написать две сопрограммы — sender(queue, pattern) и reader(queue, number), которые работают так:
sender(queue, pattern) складывает в объект queue типа asyncio-queue.html все строки из последовательности pattern (гарантируется, что pattern — это последовательность строк). Когда последовательность заканчивается, sender ставит в очередь None и завершается
reader(queue, number) читает из очереди все объекты до тех пор, пока там не встретится number объектов None (это будет означать, что все sender-ы отработали). reader() должен возвращать объект типа Counter, в котором подсчитано количество вхождений всех строк из очереди (но не None).
1 import asyncio 2 import string 3 4 async def main(n): 5 queue = asyncio.Queue(4) 6 alp = string.ascii_lowercase 7 senders = [sender(queue, alp[len(alp) * i // n: len(alp) * (i + 1) // n]) for i in range(n)] 8 res = await asyncio.gather(reader(queue, n), *senders) 9 print(", ".join(f"{key}:{val}" for key, val in sorted(res[0].items()))) 10 asyncio.run(main(6))
a:1, b:1, c:1, d:1, e:1, f:1, g:1, h:1, i:1, j:1, k:1, l:1, m:1, n:1, o:1, p:1, q:1, r:1, s:1, t:1, u:1, v:1, w:1, x:1, y:1, z:1
EJudge: BarrierSync 'К барьеру!'
Написать сопрограмму serial(number, barrier), принимающую два параметра — некоторый номер number и объект barrier типа Barrier.
В тестах создаётся один барьер bar определённого размера num и запускается num заданий видаserial(i, bar), где i — некоторое произвольное целое
Каждая сопрограмма должна однократно выводить number.
Задание serial(m, b) выводит m раньше, чем serial(n, b) выводит n, если m < n
- Таким образом в результате номера выводятся в порядке неубывания.
0 0 2 2 4 4 6 6 8 8
EJudge: FilterQueue 'Вас здесь не стояло!'
Напишите класс FilterQueue со следующими свойствами:
Это потомок asyncio.Queue
В экземпляре класса атрибут очередь.window содержит первый элемент очереди, или None, если очередь пуста (просмотр очередь.window не влияет на состояние очереди)
С помощью операции фильтр in очередь можно определить, присутствуют ли в очереди такие элементы, что выражение фильтр(элемент) истинно
Метод .later() синхронно переставляет первый элемент очереди в её конец, или вызывает исключение asyncio.QueueEmpty, если очередь пуста
Метод .get() содержит необязательный параметр фильтр. Вызов очередь.get(фильтр) работает так:
Если в очереди нет элементов, на которых фильтр(элемент) истинно, работает как обычный .get().
Если в очереди есть элементы, на которых фильтр(элемент) истинно, переставляет первый элемент очереди в её конец до тех пор, пока фильтр(элемент) не истинно, а затем выполняет обычный .get().
Разрешается воспользоваться внутренним представлением Queue; код Queue можно посмотреть тут
1 async def putter(n, queue): 2 for i in range(n): 3 await queue.put(i) 4 5 async def getter(n, queue, filter): 6 for i in range(n): 7 await asyncio.sleep(0.1) 8 yield await queue.get(filter) 9 10 async def main(): 11 queue = FilterQueue(10) 12 asyncio.create_task(putter(20, queue)) 13 async for res in getter(20, queue, lambda n: n % 2): 14 print(res) 15 16 asyncio.run(main())
1 3 5 7 9 11 13 15 17 4 19 12 6 16 8 14 0 10 2 18