In case one task of gather
raises an exception, the others are still allowed to continue.
Well, that's not exactly what I need. I want to distinguish between errors that are fatal and need to cancel all remaining tasks, and errors that are not and instead should be logged while allowing other tasks to continue.
Here is my failed attempt to implement this:
from asyncio import gather, get_event_loop, sleep
class ErrorThatShouldCancelOtherTasks(Exception):
pass
async def my_sleep(secs):
await sleep(secs)
if secs == 5:
raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
print(f'Slept for {secs}secs.')
async def main():
try:
sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
await sleepers
except ErrorThatShouldCancelOtherTasks:
print('Fatal error; cancelling')
sleepers.cancel()
finally:
await sleep(5)
get_event_loop().run_until_complete(main())
(the finally await sleep
here is to prevent the interpreter from closing immediately, which would on its own cancel all tasks)
Oddly, calling cancel
on the gather
does not actually cancel it!
PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.
I am very surprised by this behavior since it seems to be contradictory to the documentation [1], which states:
asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
Return a future aggregating results from the given coroutine objects or futures.
(...)
Cancellation: if the outer Future is cancelled, all children (that have not completed yet) are also cancelled. (...)
What am I missing here? How to cancel the remaining tasks?
The problem with your implementation is that it calls sleepers.cancel()
after sleepers
has already raised. Technically the future returned by gather()
is in a completed state, so its cancellation must be no-op.
To correct the code, you just need to cancel the children yourself instead of trusting gather
's future to do it. Of course, coroutines are not themselves cancelable, so you need to convert them to tasks first (which gather
would do anyway, so you're doing no extra work). For example:
async def main():
tasks = [asyncio.ensure_future(my_sleep(secs))
for secs in [2, 5, 7]]
try:
await asyncio.gather(*tasks)
except ErrorThatShouldCancelOtherTasks:
print('Fatal error; cancelling')
for t in tasks:
t.cancel()
finally:
await sleep(5)
I am very surprised by this behavior since it seems to be contradictory to the documentation[...]
The initial stumbling block with gather
is that it doesn't really run tasks, it's just a helper to wait for them to finish. For this reason gather
doesn't bother to cancel the remaining tasks if some of them fails with
an exception - it just abandons the wait and propagates the exception,
leaving the remaining tasks to proceed in the background. This was
reported as a bug
[1], but wasn't fixed for backward
compatibility and because the behavior is documented and unchanged from
the beginning. But here we have another wart: the documentation
explicitly promises being able to cancel the returned future. Your code
does exactly that and that doesn't work, without it being obvious why
(at least it took me a while to figure it out, and required reading the
source
[2]). It turns out that the contract of
Future
[3] actually prevents this from working. By the time you call cancel()
, the future returned by gather
has already completed,
and cancelling a completed future is meaningless, it is just no-op.
(The reason is that a completed future has a well-defined result that
could have been observed by outside code. Cancelling it would change its
result, which is not allowed.)
In other words, the documentation is not wrong, because canceling would have worked if you had performed it prior to await sleepers
having completed. However, it's misleading, because it appears to allow canceling gather()
in this important use case of one of its awaitable raising, but in reality doesn't.
Problems like this that pop up when using gather
are reason why many people eagerly await (no pun intended) trio-style nurseries
in asyncio
[4] (edit:
added
[5] many years later in Python 3.11.)
[2] https://github.com/python/cpython/blob/bea33f5e1db6e4a554919a82894f44568576e979/Lib/asyncio/tasks.py#L702
[3] https://docs.python.org/3/library/asyncio-future.html#asyncio.Future
[4] https://twitter.com/1st1/status/1028032168327565312?lang=en
[5] https://docs.python.org/3/library/asyncio-task.html#task-groups
my_sleep
in a (otherwise ill-advised) try: ... except Exception as err: log_exception_and_ignore_it(err)
? - user4385532
gather
in a loop, catching except Exception
and pruning the tasks that raised (and logging their exceptions), but
that just ends up being much more code than what you've proposed, and
for no real gain. - user4815162342
task.cancel()
calls was to follow up the for t in tasks
loop by awaiting another gather statement, i.e. await asyncio.gather(*tasks, return_exceptions=True)
You can create your own custom gather-function
This cancels all its children when any exception occurs:
import asyncio
async def gather(*tasks, **kwargs):
tasks = [ task if isinstance(task, asyncio.Task) else asyncio.create_task(task)
for task in tasks ]
try:
return await asyncio.gather(*tasks, **kwargs)
except BaseException as e:
for task in tasks:
task.cancel()
raise e
# If a() or b() raises an exception, both are immediately cancelled
a_result, b_result = await gather(a(), b())
raise
rather than raise e
, add the former will preserve the original stack trace. asyncio.wait
.
It takes an iterable of awaitables and a condition as to when to
return, and when the condition is met, it returns two sets of tasks:
completed ones and pending ones. You can have it return on the first
exception and then cancel the pending tasks one by one:
async def my_task(x):
try:
...
except RecoverableError as e:
...
tasks = [asyncio.crate_task(my_task(x)) for x in xs]
done, pending = await asyncio.wait(taksk, return_when=asyncio.FIRST_EXCEPTION)
for p in pending:
p.cancel()
And you can wrap your tasks in try-except re-raising the fatal exceptions and processing not-fatal ones otherwise. It's not gather
, but it looks like it does what you want.
https://docs.python.org/3/library/asyncio-task.html#id9
source: https://stackoverflow.com
gather
can be replaced by aTaskGroup
, which has those cancelling semantics. - Alex Povel