Tuesday, October 8, 2024

Python Asyncio - How to cancel all remaining tasks in tasks gather if one fails

In case one task of gather raises an exception, the others are still allowed to continue.

Well, that's not exactly what I need. I want to distinguish between errors that are fatal and need to cancel all remaining tasks, and errors that are not and instead should be logged while allowing other tasks to continue.

Here is my failed attempt to implement this:

from asyncio import gather, get_event_loop, sleep

class ErrorThatShouldCancelOtherTasks(Exception):
    pass

async def my_sleep(secs):
    await sleep(secs)
    if secs == 5:
        raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
    print(f'Slept for {secs}secs.')

async def main():
    try:
        sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
        await sleepers
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        sleepers.cancel()
    finally:
        await sleep(5)

get_event_loop().run_until_complete(main())

(the finally await sleep here is to prevent the interpreter from closing immediately, which would on its own cancel all tasks)

Oddly, calling cancel on the gather does not actually cancel it!

PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.

I am very surprised by this behavior since it seems to be contradictory to the documentation [1], which states:

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

Return a future aggregating results from the given coroutine objects or futures.

(...)

Cancellation: if the outer Future is cancelled, all children (that have not completed yet) are also cancelled. (...)

What am I missing here? How to cancel the remaining tasks?

Came here from this issue. In Python 3.11+, a gather can be replaced by a TaskGroup, which has those cancelling semantics. - Alex Povel
[+35] [2019-11-27 16:00:42] user4815162342 [ACCEPTED]

The problem with your implementation is that it calls sleepers.cancel() after sleepers has already raised. Technically the future returned by gather() is in a completed state, so its cancellation must be no-op.

To correct the code, you just need to cancel the children yourself instead of trusting gather's future to do it. Of course, coroutines are not themselves cancelable, so you need to convert them to tasks first (which gather would do anyway, so you're doing no extra work). For example:

async def main():
    tasks = [asyncio.ensure_future(my_sleep(secs))
             for secs in [2, 5, 7]]
    try:
        await asyncio.gather(*tasks)
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        for t in tasks:
            t.cancel()
    finally:
        await sleep(5)

I am very surprised by this behavior since it seems to be contradictory to the documentation[...]

The initial stumbling block with gather is that it doesn't really run tasks, it's just a helper to wait for them to finish. For this reason gather doesn't bother to cancel the remaining tasks if some of them fails with an exception - it just abandons the wait and propagates the exception, leaving the remaining tasks to proceed in the background. This was reported as a bug [1], but wasn't fixed for backward compatibility and because the behavior is documented and unchanged from the beginning. But here we have another wart: the documentation explicitly promises being able to cancel the returned future. Your code does exactly that and that doesn't work, without it being obvious why (at least it took me a while to figure it out, and required reading the source [2]). It turns out that the contract of Future [3] actually prevents this from working. By the time you call cancel(), the future returned by gather has already completed, and cancelling a completed future is meaningless, it is just no-op. (The reason is that a completed future has a well-defined result that could have been observed by outside code. Cancelling it would change its result, which is not allowed.)

In other words, the documentation is not wrong, because canceling would have worked if you had performed it prior to await sleepers having completed. However, it's misleading, because it appears to allow canceling gather() in this important use case of one of its awaitable raising, but in reality doesn't.

Problems like this that pop up when using gather are reason why many people eagerly await (no pun intended) trio-style nurseries in asyncio [4] (edit: added [5] many years later in Python 3.11.)

[1] https://bugs.python.org/issue31452
[2] https://github.com/python/cpython/blob/bea33f5e1db6e4a554919a82894f44568576e979/Lib/asyncio/tasks.py#L702
[3] https://docs.python.org/3/library/asyncio-future.html#asyncio.Future
[4] https://twitter.com/1st1/status/1028032168327565312?lang=en
[5] https://docs.python.org/3/library/asyncio-task.html#task-groups

One more problem to be resolved is that if one coro raises a non-fatal error then a fatal error raised later by another coro will be ignored; I suppose the answer is to wrap the implementation of my_sleep in a (otherwise ill-advised) try: ... except Exception as err: log_exception_and_ignore_it(err)? - user4385532
(1) @gaazkam If your requirement is to keep going, that's probably the simplest way to do it. There is also the option of wrapping gather in a loop, catching except Exception and pruning the tasks that raised (and logging their exceptions), but that just ends up being much more code than what you've proposed, and for no real gain. - user4815162342
One thing I needed to do to avoid raising cancellation errors from all the task.cancel() calls was to follow up the for t in tasks loop by awaiting another gather statement, i.e. await asyncio.gather(*tasks, return_exceptions=True)

You can create your own custom gather-function

This cancels all its children when any exception occurs:

import asyncio

async def gather(*tasks, **kwargs):
    tasks = [ task if isinstance(task, asyncio.Task) else asyncio.create_task(task)
              for task in tasks ]
    try:
        return await asyncio.gather(*tasks, **kwargs)
    except BaseException as e:
        for task in tasks:
            task.cancel()
        raise e


# If a() or b() raises an exception, both are immediately cancelled
a_result, b_result = await gather(a(), b())

(2) Better to write simply raise rather than raise e, add the former will preserve the original stack trace. 
What you can do with Python 3.10 (and, probably, earlier versions) is use asyncio.wait. It takes an iterable of awaitables and a condition as to when to return, and when the condition is met, it returns two sets of tasks: completed ones and pending ones. You can have it return on the first exception and then cancel the pending tasks one by one:

async def my_task(x):
    try: 
        ...
    except RecoverableError as e:
        ...


tasks = [asyncio.crate_task(my_task(x)) for x in xs]
done, pending = await asyncio.wait(taksk, return_when=asyncio.FIRST_EXCEPTION)
for p in pending:
    p.cancel()

And you can wrap your tasks in try-except re-raising the fatal exceptions and processing not-fatal ones otherwise. It's not gather, but it looks like it does what you want.

https://docs.python.org/3/library/asyncio-task.html#id9

source: https://stackoverflow.com