asyncio.gather
[1] and
asyncio.wait
[2] seem to have similar uses: I have a
bunch of async things that I want to execute/wait for (not necessarily
waiting for one to finish before the next one starts).
Since Python 3.11 there is yet another similar feature,
asyncio.TaskGroup
[3].
They use a different syntax, and differ in some details, but it seems very un-pythonic to me to have several functions that have such a huge overlap in functionality.
What am I missing?
[2] https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
[3] https://docs.python.org/3/library/asyncio-task.html#asyncio.TaskGroup
Although similar in general cases ("run and get
results for many tasks"), each function has some specific functionality
for other cases (and see also TaskGroup
for Python 3.11+ below):
asyncio.gather()
[1]
Returns a Future instance, allowing high level grouping of tasks:
import asyncio
from pprint import pprint
import random
async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(1, 3))
print("<", tag)
return tag
loop = asyncio.get_event_loop()
group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])
all_groups = asyncio.gather(group1, group2, group3)
results = loop.run_until_complete(all_groups)
loop.close()
pprint(results)
All tasks in a group can be cancelled by calling group2.cancel()
or even all_groups.cancel()
. See also .gather(..., return_exceptions=True)
,
asyncio.wait()
[2]
Supports waiting to be stopped after the first task is done, or after a specified timeout, allowing lower level precision of operations:
import asyncio
import random
async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(0.5, 5))
print("<", tag)
return tag
loop = asyncio.get_event_loop()
tasks = [coro(i) for i in range(1, 11)]
print("Get first result:")
finished, unfinished = loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
for task in finished:
print(task.result())
print("unfinished:", len(unfinished))
print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
asyncio.wait(unfinished, timeout=2))
for task in finished2:
print(task.result())
print("unfinished2:", len(unfinished2))
print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))
for task in finished3:
print(task.result())
loop.close()
TaskGroup
[3] (Python 3.11+)
Update: Python 3.11 introduces
TaskGroup
s
[4] which can "automatically" await more than one task without gather()
or await()
:
# Python 3.11+ ONLY!
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(some_coro(...))
task2 = tg.create_task(another_coro(...))
print("Both tasks have completed now.")
[1] https://docs.python.org/3/library/asyncio-task.html#asyncio.gather[2] https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
[3] https://docs.python.org/3/library/asyncio-task.html#task-groups
[4] https://docs.python.org/3/library/asyncio-task.html#task-groups
A very important distinction, which is easy to miss, is the default behavior of these two functions, when it comes to exceptions.
I'll use this example to simulate a coroutine that will raise exceptions, sometimes -
import asyncio
import random
async def a_flaky_tsk(i):
await asyncio.sleep(i) # bit of fuzz to simulate a real-world example
if i % 2 == 0:
print(i, "ok")
else:
print(i, "crashed!")
raise ValueError
coros = [a_flaky_tsk(i) for i in range(10)]
await asyncio.gather(*coros)
outputs -
0 ok
1 crashed!
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 20, in <module>
asyncio.run(main())
File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/Users/dev/PycharmProjects/trading/xxx.py", line 17, in main
await asyncio.gather(*coros)
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
As you can see, the coros after index 1
never got to execute. Future returned by gather()
is done at that point (unlike wait()
) and program terminates, but if you could keep the program alive, other coroutines still would have chance to run:
async def main():
coros = [a_flaky_tsk(i) for i in range(10)]
await asyncio.gather(*coros)
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.create_task(main())
loop.run_forever()
# 0 ok
# 1 crashed!
# Task exception was never retrieved
# ....
# 2 ok
# 3 crashed!
# 4 ok
# 5 crashed!
# 6 ok
# 7 crashed!
# 8 ok
# 9 crashed!
But await asyncio.wait(coros)
continues to execute tasks, even if some of them fail (Future returned by wait()
is not done, unlike gather()
) -
0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
Task exception was never retrieved
future: <Task finished name='Task-10' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-8' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-9' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
raise ValueError
ValueError
Of course, this behavior can be changed for both by using -
asyncio.gather(..., return_exceptions=True)
or,
asyncio.wait([...], return_when=asyncio.FIRST_EXCEPTION)
But it doesn't end here!
Notice:
Task exception was never retrieved
in the logs above.
asyncio.wait()
won't re-raise exceptions from the child tasks until you await
them individually. (The stacktrace in the logs are just messages, they cannot be caught!)
done, pending = await asyncio.wait(coros)
for tsk in done:
try:
await tsk
except Exception as e:
print("I caught:", repr(e))
Output -
0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
On the other hand, to catch exceptions with asyncio.gather()
, you must -
results = await asyncio.gather(*coros, return_exceptions=True)
for result_or_exc in results:
if isinstance(result_or_exc, Exception):
print("I caught:", repr(result_or_exc))
(Same output as before)
Task exception was never retrieved
error until I came across this post. Thanks a lot for great explanation.. - Saurav Kumar
Task exception was never retrieved
is shown where there are no references left to the task object (right
before destroying). Python notifies you about exception in the task
because you will never be able to gain acces to it later. - Den Avrondo
---
asyncio.wait
is more low level than asyncio.gather
.
As the name suggests, asyncio.gather
mainly focuses on gathering the results. It waits on a bunch of futures and returns their results in a given order.
asyncio.wait
just waits on the futures. And instead of
giving you the results directly, it gives done and pending tasks. You
have to manually collect the values.
Moreover, you could specify to wait for all futures to finish or just the first one with wait
.
asyncio.wait
has a parameter called return_when
, which you can use to control when the event loop should yield back to you. asyncio.gather
does not have such parameter, the event loop only get back to you when
all tasks have finished/failed. Read the official docs here: docs.python.org/3/library/asyncio-task.html#asyncio.wait - ospider
return_when
for asyncio.wait
is already available in Python 3.5.9! See here: docs.python.org/3.5/library/asyncio-task.html#asyncio.wait - e.d.n.a
python -m timeit "print('hello')"
gives 36.6 usec per loop, so 10000000000000 print('hello')
will take 11.6 years to complete for just print()
function - Karol Zlot
I also noticed that you can provide a group of coroutines in wait() by simply specifying the list:
result=loop.run_until_complete(asyncio.wait([
say('first hello', 2),
say('second hello', 1),
say('third hello', 4)
]))
Whereas grouping in gather() is done by just specifying multiple coroutines:
result=loop.run_until_complete(asyncio.gather(
say('first hello', 2),
say('second hello', 1),
say('third hello', 4)
))
gather()
, e.g.: asyncio.gather(*task_list)
- tehfink
await
right-away! group = asyncio.gather(*aws)
returns an awaitable/future for the group directly, which represents all the combined tasks. The tasks can run soon after the asyncio.gather
-call, e.g. when there is an await
for something else (like asyncio.sleep
) or when accessing the future (like group.done()
). You only need to use await group
, when you want to make sure the tasks are done or cancelled and to collect all the results. - e.d.n.a
---- In addition to all the previous answers, I would like to tell about the different behavior of gather()
and wait()
in case they are cancelled.
Gather() [1] cancellation
If gather()
is cancelled, all submitted awaitables (that have not completed yet) are also cancelled.
Wait() [2] cancellation
If the wait()
ing task is cancelled, it simply throws an CancelledError
and the waited tasks remain intact.
Simple example:
import asyncio
async def task(arg):
await asyncio.sleep(5)
return arg
async def cancel_waiting_task(work_task, waiting_task):
await asyncio.sleep(2)
waiting_task.cancel()
try:
await waiting_task
print("Waiting done")
except asyncio.CancelledError:
print("Waiting task cancelled")
try:
res = await work_task
print(f"Work result: {res}")
except asyncio.CancelledError:
print("Work task cancelled")
async def main():
work_task = asyncio.create_task(task("done"))
waiting = asyncio.create_task(asyncio.wait({work_task}))
await cancel_waiting_task(work_task, waiting)
work_task = asyncio.create_task(task("done"))
waiting = asyncio.gather(work_task)
await cancel_waiting_task(work_task, waiting)
asyncio.run(main())
Output:
asyncio.wait()
Waiting task cancelled
Work result: done
----------------
asyncio.gather()
Waiting task cancelled
Work task cancelled
Application example
Sometimes it becomes necessary to combine wait()
and gather()
functionality. For example, we want to wait for the completion of at least one task and cancel the rest pending tasks after that, and if the waiting
itself was canceled, then also cancel all pending tasks.
As real examples, let's say we have a disconnect event and a work task. And we want to wait for the results of the work task, but if the connection was lost, then cancel it. Or we will make several parallel requests, but upon completion of at least one response, cancel all others.
It could be done this way:
import asyncio
from typing import Optional, Tuple, Set
async def wait_any(
tasks: Set[asyncio.Future], *, timeout: Optional[int] = None,
) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]:
tasks_to_cancel: Set[asyncio.Future] = set()
try:
done, tasks_to_cancel = await asyncio.wait(
tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
)
return done, tasks_to_cancel
except asyncio.CancelledError:
tasks_to_cancel = tasks
raise
finally:
for task in tasks_to_cancel:
task.cancel()
async def task():
await asyncio.sleep(5)
async def cancel_waiting_task(work_task, waiting_task):
await asyncio.sleep(2)
waiting_task.cancel()
try:
await waiting_task
print("Waiting done")
except asyncio.CancelledError:
print("Waiting task cancelled")
try:
res = await work_task
print(f"Work result: {res}")
except asyncio.CancelledError:
print("Work task cancelled")
async def check_tasks(waiting_task, working_task, waiting_conn_lost_task):
try:
await waiting_task
print("waiting is done")
except asyncio.CancelledError:
print("waiting is cancelled")
try:
await waiting_conn_lost_task
print("connection is lost")
except asyncio.CancelledError:
print("waiting connection lost is cancelled")
try:
await working_task
print("work is done")
except asyncio.CancelledError:
print("work is cancelled")
async def work_done_case():
working_task = asyncio.create_task(task())
connection_lost_event = asyncio.Event()
waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
await check_tasks(waiting_task, working_task, waiting_conn_lost_task)
async def conn_lost_case():
working_task = asyncio.create_task(task())
connection_lost_event = asyncio.Event()
waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
await asyncio.sleep(2)
connection_lost_event.set() # <---
await check_tasks(waiting_task, working_task, waiting_conn_lost_task)
async def cancel_waiting_case():
working_task = asyncio.create_task(task())
connection_lost_event = asyncio.Event()
waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
await asyncio.sleep(2)
waiting_task.cancel() # <---
await check_tasks(waiting_task, working_task, waiting_conn_lost_task)
async def main():
print("Work done")
print("-------------------")
await work_done_case()
print("\nConnection lost")
print("-------------------")
await conn_lost_case()
print("\nCancel waiting")
print("-------------------")
await cancel_waiting_case()
asyncio.run(main())
Output:
Work done
-------------------
waiting is done
waiting connection lost is cancelled
work is done
Connection lost
-------------------
waiting is done
connection is lost
work is cancelled
Cancel waiting
-------------------
waiting is cancelled
waiting connection lost is cancelled
work is cancelled
[1] https://docs.python.org/3/library/asyncio-task.html#running-tasks-concurrently[2] https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
asyncio.gather()
code, If the code that creates those three groups is contained within a function body, you can get rid of theloop = asyncio.get_event_loop()
and refactor the code adding anawait
to theasyncio.gather(group1, group2, group3)
making it slightly simpler, and all the lines related with the loop variables will no longer be needed - Yassine Nacifdemo.py
file and execute it from the command line usingpython demo.py
- Udirun_until_complete
causes issues when there already is an async loop running. What alternative is there to await for the tasks to finish synchronously? - theberzipython demo.py
. this is with python3.11.2 - stephendwolff