Friday, October 11, 2024

Python Webscrape tr-ex.me

import requests

from bs4 import BeautifulSoup


user_agent = 'Mozilla/5 (Solaris 10) Gecko'

headers = { 'User-Agent' : user_agent }

# values = {'s' : sys.argv[1] }

word = 'masă'

# https://tr-ex.me/translation/romanian-english/casă?p=1&page=1&tm=ptable_exact&translation=&h=110d823af35d34cae60fd423dd67762a&target_filter


url = f'https://tr-ex.me/translation/romanian-english/{word}'

response = requests.get(url, headers=headers)

pool = BeautifulSoup(response.text, 'html.parser')

# print(pool)


if pool.find('span', attrs={'class' : 'context-not-found-text'}):

print("Word not found in trex!")

# exit()

else:

print("Automatic word extractions:")

wordresults = pool.find('div', attrs={'class' : 'translations-wrapper'}).find_all('div', attrs={'class' : 'translation-wrapper'})

for word in wordresults:

print(word.find('a', attrs={'class' : 'translation'}).find('span', attrs={'class' : 'text'}).text)


print("Related 2-word phrases")

for bigrams in pool.find_all('div', attrs={'class' : 'context-examples'}):

for bigram in bigrams.find_all('a', attrs={'class' : 'context-example'}):

print(bigram.text)


Tuesday, October 8, 2024

Quart Async Example

Asynchronous Programming - One of the key features of Quart is its support for asynchronous programming. Asynchronous programming allows us to handle multiple requests at the same time and provide faster response times.

from quart import Quart, render_template

app = Quart(__name__)

async def get_items():
    # Simulate a long-running task
    await asyncio.sleep(5)
    return ['item1', 'item2', 'item3']

@app.route('/items')
async def items():
    items = await get_items()
    return await render_template('items.html', items=items)

if __name__ == '__main__':
    app.run()

We define an async function called get_items that simulates a long-running task by sleeping for 5 seconds. We then define a route ('/items') that calls this async function and renders our items.html template with the list of items.

By using asynchronous programming, we can handle other requests while the get_items function is running. This allows us to provide faster response times and improve the overall performance of our web application.

In Quart, we can also use asynchronous libraries and modules to handle tasks such as database queries and network requests. Here’s an example of how to use an asynchronous database driver (aiosqlite) in Quart:

import aiosqlite

async def get_items():
    async with aiosqlite.connect('mydatabase.db') as db:
        cursor = await db.execute('SELECT * FROM items')
        rows = await cursor.fetchall()
    return [row[0] for row in rows]

We use the aiosqlite module to connect to a SQLite database and retrieve a list of items from a table called items.

Quart’s support for asynchronous programming makes it easy to build high-performance web applications in Python. In the next section, we will look at how to integrate with databases in Quart.

Integrating with Databases

Quart makes it easy to integrate with databases and store data in our web application. Here’s an example of how to use SQLite with Quart:

import sqlite3
from quart import Quart, g, jsonify, request

app = Quart(__name__)

DATABASE = 'mydatabase.db'

def get_db():
    if 'db' not in g:
        g.db = sqlite3.connect(DATABASE)
        g.db.row_factory = sqlite3.Row
    return g.db

@app.route('/api/items')
async def get_items():
    db = get_db()
    cursor = db.execute('SELECT * FROM items')
    rows = cursor.fetchall()
    items = [dict(row) for row in rows]
    return await jsonify(items)

@app.route('/api/items', methods=['POST'])
async def add_item():
    data = await request.get_json()
    db = get_db()
    db.execute('INSERT INTO items (name) VALUES (?)', [data['name']])
    db.commit()
    response_data = {'message': 'Item added successfully', 'item': data['name']}
    response = await jsonify(response_data)
    response.status_code = 201
    return response

if __name__ == '__main__':
    app.run()

In this example, we define a function called get_db that connects to our SQLite database and returns a database connection object. We also define two routes - one to retrieve a list of items from the database and one to add a new item to the database.

By using the get_db function, we can ensure that we have a database connection available for each request. We can also use the sqlite3.Row factory to return rows as dictionaries, which makes it easy to convert our database results to JSON.

In the add_item route, we use the request.get_json function to extract the data from the request body and insert it into the items table in our database.

Quart supports other databases such as PostgreSQL and MySQL, and can also integrate with ORMs such as SQLAlchemy.

Deploy a Quart web application with authentication and authorization

In many web applications, we need to add authentication and authorization to restrict access to certain parts of our application. In Quart, we can use third-party libraries such as Flask-Login and Flask-Principal to add authentication and authorization.

Example with Flask-Login in Quart:

from quart import Quart, render_template, request, redirect, url_for
from flask_login import LoginManager, UserMixin, login_required, login_user, logout_user

app = Quart(__name__)
app.secret_key = 'mysecretkey'

login_manager = LoginManager()
login_manager.init_app(app)

class User(UserMixin):
    def __init__(self, id):
        self.id = id

    def __repr__(self):
        return f'<User {self.id}>'

@login_manager.user_loader
def load_user(user_id):
    return User(user_id)

@app.route('/')
async def index():
    return await render_template('index.html')

@app.route('/login', methods=['GET', 'POST'])
async def login():
    if request.method == 'POST':
        user_id = request.form['user_id']
        user = User(user_id)
        login_user(user)
        return redirect(url_for('dashboard'))
    return await render_template('login.html')

@app.route('/dashboard')
@login_required
async def dashboard():
    return await render_template('dashboard.html')

@app.route('/logout')
@login_required
async def logout():
    logout_user()
    return redirect(url_for('index'))

if __name__ == '__main__':
    app.run()

In this example, we define a User class that inherits from UserMixin, which provides default implementations for some methods required by Flask-Login. We also define routes for the login page, dashboard page, and logout page.

By using the @login_required decorator, we can restrict access to the dashboard and logout routes to only authenticated users. We can also use the login_user and logout_user functions to handle user authentication.

In the login route, we retrieve the user ID from the request form and create a User object. We then use the login_user function to authenticate the user and redirect them to the dashboard page.

Flask-Principal is another library that provides more fine-grained control over access to resources. It allows us to define roles and permissions for users and restrict access to certain routes or resources based on those roles and permissions.

In this way, we can add authentication and authorization to our Quart web application and provide more secure access to our resources.

Deploying the Web Application

After we have developed our Quart web application, we need to deploy it to a production environment. There are many ways to deploy a Quart application, including using a web server such as Nginx or Apache, or deploying to a Platform as a Service (PaaS) provider such as Heroku or AWS Elastic Beanstalk.

One common approach to deploying a Quart application is to use the Quart-ASGI server, which is a lightweight ASGI server designed specifically for Quart. Here’s an example of how to deploy a Quart application using Quart-ASGI:

import uvicorn
from myapp import app

if __name__ == '__main__':
    uvicorn.run(app, host='0.0.0.0', port=5000)

In this example, we import the uvicorn server and our app object from our Quart application. We then call the uvicorn.run function with our app object and specify the host and port to run the server on.

By using the Quart-ASGI server, we can take advantage of Quart’s asynchronous programming features and provide a high-performance web application.

Another approach to deploying a Quart application is to use Docker. Docker allows us to package our application and its dependencies into a container, which can be easily deployed to any platform that supports Docker.

Dockerfile for a Quart application:

FROM python:3.9-alpine

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["quart", "run", "--host", "0.0.0.0"]

In this example, we use a Python 3.9 base image and install the dependencies specified in our requirements.txt file. We then copy our application code into the container and specify the command to run the Quart server.

By using Docker, we can easily deploy our Quart application to any platform that supports Docker, such as Kubernetes or AWS Elastic Beanstalk.

In conclusion, there are many ways to deploy a Quart web application, and the choice depends on factors such as performance, scalability, and ease of deployment. By using the Quart-ASGI server or Docker, we can deploy our Quart application to production and provide a high-performance, reliable web application.

source: https://pythonic.rapellys.biz

Python asyncio.gather vs asyncio.wait (vs asyncio.TaskGroup)

[ https://stackoverflow.com/questions/42231161/asyncio-gather-vs-asyncio-wait-vs-asyncio-taskgroup ]

asyncio.gather [1] and asyncio.wait [2] seem to have similar uses: I have a bunch of async things that I want to execute/wait for (not necessarily waiting for one to finish before the next one starts).

Since Python 3.11 there is yet another similar feature, asyncio.TaskGroup [3].

They use a different syntax, and differ in some details, but it seems very un-pythonic to me to have several functions that have such a huge overlap in functionality.

What am I missing?

Although similar in general cases ("run and get results for many tasks"), each function has some specific functionality for other cases (and see also TaskGroup for Python 3.11+ below):

asyncio.gather() [1]

Returns a Future instance, allowing high level grouping of tasks:

import asyncio
from pprint import pprint

import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(1, 3))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])

all_groups = asyncio.gather(group1, group2, group3)

results = loop.run_until_complete(all_groups)

loop.close()

pprint(results)

All tasks in a group can be cancelled by calling group2.cancel() or even all_groups.cancel(). See also .gather(..., return_exceptions=True),

asyncio.wait() [2]

Supports waiting to be stopped after the first task is done, or after a specified timeout, allowing lower level precision of operations:

import asyncio
import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(0.5, 5))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]

print("Get first result:")
finished, unfinished = loop.run_until_complete(
    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

for task in finished:
    print(task.result())
print("unfinished:", len(unfinished))

print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
    asyncio.wait(unfinished, timeout=2))

for task in finished2:
    print(task.result())
print("unfinished2:", len(unfinished2))

print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))

for task in finished3:
    print(task.result())

loop.close()

TaskGroup [3] (Python 3.11+)

Update: Python 3.11 introduces TaskGroup s [4] which can "automatically" await more than one task without gather() or await():

# Python 3.11+ ONLY!
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro(...))
        task2 = tg.create_task(another_coro(...))
    print("Both tasks have completed now.")
[1] https://docs.python.org/3/library/asyncio-task.html#asyncio.gather
[2] https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
[3] https://docs.python.org/3/library/asyncio-task.html#task-groups
[4] https://docs.python.org/3/library/asyncio-task.html#task-groups

(19) "The single asterisk form ( *args ) is used to pass a non-keyworded, variable-length argument list, and the double asterisk form is used to pass a keyworded, variable-length argument list" - laycat
(3) In the asyncio.gather() code, If the code that creates those three groups is contained within a function body, you can get rid of the loop = asyncio.get_event_loop() and refactor the code adding an await to the asyncio.gather(group1, group2, group3) making it slightly simpler, and all the lines related with the loop variables will no longer be needed - Yassine Nacif
Dear Udi, in your asyncio.gather example, whenever I run in my computer, I get the "RuntimeError: This event loop is already running", even though the results are properly evaluated. How can one solve this RuntimeError? Shouldn't we use the loop object? - Philipe Riskalla Leal
@PhilipeRiskallaLeal: copy the text into a demo.py file and execute it from the command line using python demo.py - Udi
run_until_complete causes issues when there already is an async loop running. What alternative is there to await for the tasks to finish synchronously? - theberzi
i get "RuntimeError: This event loop is already running" when copying the asyncio.wait example (but not the asyncio.gather example) into a file and running with python demo.py. this is with python3.11.2 - stephendwolff
1
[+104] [2020-08-23 09:01:44] Dev Aggarwal

A very important distinction, which is easy to miss, is the default behavior of these two functions, when it comes to exceptions.


I'll use this example to simulate a coroutine that will raise exceptions, sometimes -

import asyncio
import random


async def a_flaky_tsk(i):
    await asyncio.sleep(i)  # bit of fuzz to simulate a real-world example

    if i % 2 == 0:
        print(i, "ok")
    else:
        print(i, "crashed!")
        raise ValueError

coros = [a_flaky_tsk(i) for i in range(10)]

await asyncio.gather(*coros) outputs -

0 ok
1 crashed!
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 20, in <module>
    asyncio.run(main())
  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 17, in main
    await asyncio.gather(*coros)
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError

As you can see, the coros after index 1 never got to execute. Future returned by gather() is done at that point (unlike wait()) and program terminates, but if you could keep the program alive, other coroutines still would have chance to run:

async def main():
    coros = [a_flaky_tsk(i) for i in range(10)]
    await asyncio.gather(*coros)
    

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.create_task(main())
    loop.run_forever()

# 0 ok
# 1 crashed!
# Task exception was never retrieved
#  ....
# 2 ok
# 3 crashed!
# 4 ok
# 5 crashed!
# 6 ok
# 7 crashed!
# 8 ok
# 9 crashed!

But await asyncio.wait(coros) continues to execute tasks, even if some of them fail (Future returned by wait() is not done, unlike gather()) -

0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
Task exception was never retrieved
future: <Task finished name='Task-10' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-8' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-9' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError

Of course, this behavior can be changed for both by using -

asyncio.gather(..., return_exceptions=True)

or,

asyncio.wait([...], return_when=asyncio.FIRST_EXCEPTION)


But it doesn't end here!

Notice: Task exception was never retrieved in the logs above.

asyncio.wait() won't re-raise exceptions from the child tasks until you await them individually. (The stacktrace in the logs are just messages, they cannot be caught!)

done, pending = await asyncio.wait(coros)
for tsk in done:
    try:
        await tsk
    except Exception as e:
        print("I caught:", repr(e))

Output -

0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()

On the other hand, to catch exceptions with asyncio.gather(), you must -

results = await asyncio.gather(*coros, return_exceptions=True)
for result_or_exc in results:
    if isinstance(result_or_exc, Exception):
        print("I caught:", repr(result_or_exc))

(Same output as before)


(13) I never understood Task exception was never retrieved error until I came across this post. Thanks a lot for great explanation.. - Saurav Kumar
(2) @SauravKumar me too! Heck, this is so helpful!! - pepoluan
To help someone to understand. Task exception was never retrieved is shown where there are no references left to the task object (right before destroying). Python notifies you about exception in the task because you will never be able to gain acces to it later. - Den Avrondo

---

asyncio.wait is more low level than asyncio.gather.

As the name suggests, asyncio.gather mainly focuses on gathering the results. It waits on a bunch of futures and returns their results in a given order.

asyncio.wait just waits on the futures. And instead of giving you the results directly, it gives done and pending tasks. You have to manually collect the values.

Moreover, you could specify to wait for all futures to finish or just the first one with wait.


(9) @Kingname ..wat - Matt Joiner
(2) do you mean that asyncio.gather will have to wait for all of them to complete, while asyncio.wait will return to you the current status of each one (pending or not)? Reading your answer is not clear to me - EigenFool
(2) @EigenFool As of Python 3.9, asyncio.wait has a parameter called return_when, which you can use to control when the event loop should yield back to you. asyncio.gather does not have such parameter, the event loop only get back to you when all tasks have finished/failed. Read the official docs here: docs.python.org/3/library/asyncio-task.html#asyncio.wait - ospider
(3) @ospider The parameter called return_when for asyncio.wait is already available in Python 3.5.9! See here: docs.python.org/3.5/library/asyncio-task.html#asyncio.wait - e.d.n.a
(2) @Kingname python -m timeit "print('hello')" gives 36.6 usec per loop, so 10000000000000 print('hello') will take 11.6 years to complete for just print() function - Karol Zlot
3
[+23] [2018-01-30 16:31:07] Johny Ebanat

I also noticed that you can provide a group of coroutines in wait() by simply specifying the list:

result=loop.run_until_complete(asyncio.wait([
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ]))

Whereas grouping in gather() is done by just specifying multiple coroutines:

result=loop.run_until_complete(asyncio.gather(
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ))

(36) Lists can also be used with gather(), e.g.: asyncio.gather(*task_list) - tehfink
(5) So can generators - Jab
(2) How can you use this gather without blocking the rest of the script? - thebeancounter
(2) Awesome. Thanks for the dramatically easier to read example. - Yablargo
(5) @thebeancounter You don't need to await right-away! group = asyncio.gather(*aws) returns an awaitable/future for the group directly, which represents all the combined tasks. The tasks can run soon after the asyncio.gather-call, e.g. when there is an await for something else (like asyncio.sleep) or when accessing the future (like group.done()). You only need to use await group, when you want to make sure the tasks are done or cancelled and to collect all the results. - e.d.n.a

 ---- In addition to all the previous answers, I would like to tell about the different behavior of gather() and wait() in case they are cancelled.

Gather() [1] cancellation

If gather() is cancelled, all submitted awaitables (that have not completed yet) are also cancelled.

Wait() [2] cancellation

If the wait()ing task is cancelled, it simply throws an CancelledError and the waited tasks remain intact.

Simple example:

import asyncio


async def task(arg):
    await asyncio.sleep(5)
    return arg


async def cancel_waiting_task(work_task, waiting_task):
    await asyncio.sleep(2)
    waiting_task.cancel()
    try:
        await waiting_task
        print("Waiting done")
    except asyncio.CancelledError:
        print("Waiting task cancelled")

    try:
        res = await work_task
        print(f"Work result: {res}")
    except asyncio.CancelledError:
        print("Work task cancelled")


async def main():
    work_task = asyncio.create_task(task("done"))
    waiting = asyncio.create_task(asyncio.wait({work_task}))
    await cancel_waiting_task(work_task, waiting)

    work_task = asyncio.create_task(task("done"))
    waiting = asyncio.gather(work_task)
    await cancel_waiting_task(work_task, waiting)


asyncio.run(main())

Output:

asyncio.wait()
Waiting task cancelled
Work result: done
----------------
asyncio.gather()
Waiting task cancelled
Work task cancelled

Application example

Sometimes it becomes necessary to combine wait() and gather() functionality. For example, we want to wait for the completion of at least one task and cancel the rest pending tasks after that, and if the waiting itself was canceled, then also cancel all pending tasks.

As real examples, let's say we have a disconnect event and a work task. And we want to wait for the results of the work task, but if the connection was lost, then cancel it. Or we will make several parallel requests, but upon completion of at least one response, cancel all others.

It could be done this way:

import asyncio
from typing import Optional, Tuple, Set


async def wait_any(
        tasks: Set[asyncio.Future], *, timeout: Optional[int] = None,
) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]:
    tasks_to_cancel: Set[asyncio.Future] = set()
    try:
        done, tasks_to_cancel = await asyncio.wait(
            tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
        )
        return done, tasks_to_cancel
    except asyncio.CancelledError:
        tasks_to_cancel = tasks
        raise
    finally:
        for task in tasks_to_cancel:
            task.cancel()


async def task():
    await asyncio.sleep(5)


async def cancel_waiting_task(work_task, waiting_task):
    await asyncio.sleep(2)
    waiting_task.cancel()
    try:
        await waiting_task
        print("Waiting done")
    except asyncio.CancelledError:
        print("Waiting task cancelled")

    try:
        res = await work_task
        print(f"Work result: {res}")
    except asyncio.CancelledError:
        print("Work task cancelled")


async def check_tasks(waiting_task, working_task, waiting_conn_lost_task):
    try:
        await waiting_task
        print("waiting is done")
    except asyncio.CancelledError:
        print("waiting is cancelled")

    try:
        await waiting_conn_lost_task
        print("connection is lost")
    except asyncio.CancelledError:
        print("waiting connection lost is cancelled")

    try:
        await working_task
        print("work is done")
    except asyncio.CancelledError:
        print("work is cancelled")


async def work_done_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def conn_lost_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await asyncio.sleep(2)
    connection_lost_event.set()  # <---
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def cancel_waiting_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await asyncio.sleep(2)
    waiting_task.cancel()  # <---
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def main():
    print("Work done")
    print("-------------------")
    await work_done_case()
    print("\nConnection lost")
    print("-------------------")
    await conn_lost_case()
    print("\nCancel waiting")
    print("-------------------")
    await cancel_waiting_case()


asyncio.run(main())

Output:

Work done
-------------------
waiting is done
waiting connection lost is cancelled
work is done

Connection lost
-------------------
waiting is done
connection is lost
work is cancelled

Cancel waiting
-------------------
waiting is cancelled
waiting connection lost is cancelled
work is cancelled
[1] https://docs.python.org/3/library/asyncio-task.html#running-tasks-concurrently
[2] https://docs.python.org/3/library/asyncio-task.html#asyncio.wait

Python Asyncio - How to cancel all remaining tasks in tasks gather if one fails

In case one task of gather raises an exception, the others are still allowed to continue.

Well, that's not exactly what I need. I want to distinguish between errors that are fatal and need to cancel all remaining tasks, and errors that are not and instead should be logged while allowing other tasks to continue.

Here is my failed attempt to implement this:

from asyncio import gather, get_event_loop, sleep

class ErrorThatShouldCancelOtherTasks(Exception):
    pass

async def my_sleep(secs):
    await sleep(secs)
    if secs == 5:
        raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
    print(f'Slept for {secs}secs.')

async def main():
    try:
        sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
        await sleepers
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        sleepers.cancel()
    finally:
        await sleep(5)

get_event_loop().run_until_complete(main())

(the finally await sleep here is to prevent the interpreter from closing immediately, which would on its own cancel all tasks)

Oddly, calling cancel on the gather does not actually cancel it!

PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.

I am very surprised by this behavior since it seems to be contradictory to the documentation [1], which states:

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

Return a future aggregating results from the given coroutine objects or futures.

(...)

Cancellation: if the outer Future is cancelled, all children (that have not completed yet) are also cancelled. (...)

What am I missing here? How to cancel the remaining tasks?

Came here from this issue. In Python 3.11+, a gather can be replaced by a TaskGroup, which has those cancelling semantics. - Alex Povel
[+35] [2019-11-27 16:00:42] user4815162342 [ACCEPTED]

The problem with your implementation is that it calls sleepers.cancel() after sleepers has already raised. Technically the future returned by gather() is in a completed state, so its cancellation must be no-op.

To correct the code, you just need to cancel the children yourself instead of trusting gather's future to do it. Of course, coroutines are not themselves cancelable, so you need to convert them to tasks first (which gather would do anyway, so you're doing no extra work). For example:

async def main():
    tasks = [asyncio.ensure_future(my_sleep(secs))
             for secs in [2, 5, 7]]
    try:
        await asyncio.gather(*tasks)
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        for t in tasks:
            t.cancel()
    finally:
        await sleep(5)

I am very surprised by this behavior since it seems to be contradictory to the documentation[...]

The initial stumbling block with gather is that it doesn't really run tasks, it's just a helper to wait for them to finish. For this reason gather doesn't bother to cancel the remaining tasks if some of them fails with an exception - it just abandons the wait and propagates the exception, leaving the remaining tasks to proceed in the background. This was reported as a bug [1], but wasn't fixed for backward compatibility and because the behavior is documented and unchanged from the beginning. But here we have another wart: the documentation explicitly promises being able to cancel the returned future. Your code does exactly that and that doesn't work, without it being obvious why (at least it took me a while to figure it out, and required reading the source [2]). It turns out that the contract of Future [3] actually prevents this from working. By the time you call cancel(), the future returned by gather has already completed, and cancelling a completed future is meaningless, it is just no-op. (The reason is that a completed future has a well-defined result that could have been observed by outside code. Cancelling it would change its result, which is not allowed.)

In other words, the documentation is not wrong, because canceling would have worked if you had performed it prior to await sleepers having completed. However, it's misleading, because it appears to allow canceling gather() in this important use case of one of its awaitable raising, but in reality doesn't.

Problems like this that pop up when using gather are reason why many people eagerly await (no pun intended) trio-style nurseries in asyncio [4] (edit: added [5] many years later in Python 3.11.)

[1] https://bugs.python.org/issue31452
[2] https://github.com/python/cpython/blob/bea33f5e1db6e4a554919a82894f44568576e979/Lib/asyncio/tasks.py#L702
[3] https://docs.python.org/3/library/asyncio-future.html#asyncio.Future
[4] https://twitter.com/1st1/status/1028032168327565312?lang=en
[5] https://docs.python.org/3/library/asyncio-task.html#task-groups

One more problem to be resolved is that if one coro raises a non-fatal error then a fatal error raised later by another coro will be ignored; I suppose the answer is to wrap the implementation of my_sleep in a (otherwise ill-advised) try: ... except Exception as err: log_exception_and_ignore_it(err)? - user4385532
(1) @gaazkam If your requirement is to keep going, that's probably the simplest way to do it. There is also the option of wrapping gather in a loop, catching except Exception and pruning the tasks that raised (and logging their exceptions), but that just ends up being much more code than what you've proposed, and for no real gain. - user4815162342
One thing I needed to do to avoid raising cancellation errors from all the task.cancel() calls was to follow up the for t in tasks loop by awaiting another gather statement, i.e. await asyncio.gather(*tasks, return_exceptions=True)

You can create your own custom gather-function

This cancels all its children when any exception occurs:

import asyncio

async def gather(*tasks, **kwargs):
    tasks = [ task if isinstance(task, asyncio.Task) else asyncio.create_task(task)
              for task in tasks ]
    try:
        return await asyncio.gather(*tasks, **kwargs)
    except BaseException as e:
        for task in tasks:
            task.cancel()
        raise e


# If a() or b() raises an exception, both are immediately cancelled
a_result, b_result = await gather(a(), b())

(2) Better to write simply raise rather than raise e, add the former will preserve the original stack trace. 
What you can do with Python 3.10 (and, probably, earlier versions) is use asyncio.wait. It takes an iterable of awaitables and a condition as to when to return, and when the condition is met, it returns two sets of tasks: completed ones and pending ones. You can have it return on the first exception and then cancel the pending tasks one by one:

async def my_task(x):
    try: 
        ...
    except RecoverableError as e:
        ...


tasks = [asyncio.crate_task(my_task(x)) for x in xs]
done, pending = await asyncio.wait(taksk, return_when=asyncio.FIRST_EXCEPTION)
for p in pending:
    p.cancel()

And you can wrap your tasks in try-except re-raising the fatal exceptions and processing not-fatal ones otherwise. It's not gather, but it looks like it does what you want.

https://docs.python.org/3/library/asyncio-task.html#id9

source: https://stackoverflow.com