Tuesday, September 12, 2023

Parallel requests/tasks with Python and asyncio

 What does asyncio.create_task() do?

It submits the coroutine to run "in the background", i.e. concurrently with the current task and all other tasks, switching between them at await points. It returns an awaitable handle called a "task" which you can also use to cancel the execution of the coroutine.

It's one of the central primitives of asyncio, the asyncio equivalent of starting a thread. (In the same analogy, awaiting the task with await is the equivalent of joining a thread.) create_task does exactly that: submit it to the event loop for execution concurrently with other tasks, the point of switching being any await. asyncio.gather is not the only way to achieve concurrency in asyncio. It's just a utility function that makes it easier to wait for a number of coroutines to all complete, and submit them to the event loop at the same time. create_task does just the submitting, it should have probably been called start_coroutine.

What is a task?

It's an asyncio construct that tracks execution of a coroutine in a concrete event loop. When you call create_task, you submit a coroutine for execution and receive back a handle. You can await this handle when you actually need the result, or you can never await it, if you don't care about the result. This handle is the task, and it inherits from Future, which makes it awaitable and also provides the lower-level callback-based interface, such as add_done_callback. The await expression blocks the containing coroutine until the awaited awaitable returns. This hinders the progress of the coroutine. But await is necessary in a coroutine to yield control back to the event loop so that other coroutines can progress.

 Standard settings:

import asyncio
import requests

async def main():
    loop = asyncio.get_event_loop()
    futures = [
        loop.run_in_executor(
            None, 
            requests.get, 
            'http://example.org/'
        )
        for i in range(20)
    ]
    for response in await asyncio.gather(*futures):
        pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
import asyncio
from typing import Any

async def delay(n) -> None:
    print(f"sleeping for {n} second(s)")
    await asyncio.sleep(n)
    print(f"done sleeping for {n} second(s)")


loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
t1: asyncio.Task[None] = loop.create_task(delay(1))
t2: asyncio.Task[None] = loop.create_task(delay(2))
loop.run_until_complete(t1)

pending: set[asyncio.Task[Any]] = asyncio.all_tasks(loop=loop)
group: asyncio.Future[list[Any]] = asyncio.gather(*pending)
loop.run_until_complete(group)
loop.close()

# with asyncio.run
async def main():
    t1: asyncio.Task[None] = loop.create_task(delay(1))
    t2: asyncio.Task[None] = loop.create_task(delay(2))
    await t2

asyncio.run(main())

 Output:

sleeping for 1 second(s)
sleeping for 2 second(s)
done sleeping for 1 second(s)
done sleeping for 2 second(s)

asyncio.all_tasks(loop=None)
Return a set of not yet finished Task objects run by the loop. If loop is None, get_running_loop() is used for getting current loop.

The stepwise curve indicates that some requests are being executed in parallel. However, the curve is still asymptotically linear. The reason that we see this step pattern is that the default Executor has an internal pool of five threads that execute work. While five requests can be executed in parallel, any remaining requests will have to wait for a thread to become available.

# Asynchronous requests with larger thread pool
import asyncio
import concurrent.futures
import requests

async def main():

    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(
                executor, 
                requests.get, 
                'http://example.org/'
            )
            for i in range(20)
        ]
        for response in await asyncio.gather(*futures):
            pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Python httpx stream data. For larger downloads, we can stream responses that do not load the entire response body into memory at once. For streaming, we use the httpx.stream method.

sync_stream.py

#!/usr/bin/python

import httpx

url = 'https://download.freebsd.org/ftp/releases/amd64/amd64/ISO-IMAGES/12.0/FreeBSD-12.0-RELEASE-amd64-mini-memstick.img'

with open('FreeBSD-12.0-RELEASE-amd64-mini-memstick.img', 'wb') as f:

    with httpx.stream('GET', url) as r:

        for chunk in r.iter_bytes():
            f.write(chunk)

Python httpx multiple asynchronous GET requests

import httpx
import asyncio

async def get_async(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

urls = ["http://webcode.me", "https://httpbin.org/get"]

async def launch():
    resps = await asyncio.gather(*map(get_async, urls))
    data = [resp.text for resp in resps]
    
    for html in data:
        print(html)

asyncio.run(launch()) 

Python httpx asynchronous POST request

import httpx
import asyncio

async def main():

    data = {'name': 'John Doe', 'occupation': 'gardener'}

    async with httpx.AsyncClient() as client:
        r = await client.post('https://httpbin.org/post', data=data)
        # for json: r = await client.post('https://httpbin.org/post', json=data)
    print(r.text)

asyncio.run(main())
 
Beware, however, that the with statement closes the client after exiting the context manager, 
so the session might not be persistent. Just use
session = httpx.AsyncClient()
await asyncio.gather(*[call_url(session) for x in range(i)])
... 
await session.aclose()

async Used to indicate which methods are going to be run asynchronously
<p> → These new methods are called coroutines.

await Used to run a coroutine once an asynchronous event loop has already started running
await can only be used inside a coroutine
→ Coroutines must be called with await, otherwise there will be a RuntimeWarning about enabling tracemalloc.

asyncio.run() Used to start running an asynchronous event loop from a normal program
asyncio.run() cannot be called in a nested fashion. You have to use await instead.
asyncio.run() cannot be used if you are running the Python file in a Jupyter Notebook because Jupyter Notebook already has a running asynchronous event loop. You have to use await. (More on this in the Running the Code section)

asyncio.create_task() Used to schedule a coroutine execution
→ Does not need to be awaited
→ Allows you to line things up without actually running them first.

asyncio.gather() Used to run the scheduled executions
→ Needs to be awaited
→ This is vital to the asynchronous program, because you let it know which is the next task it can pick up before finishing the previous one.

 async def asynchronous(urls):
    tasks = []
    for url in urls:
        task = asyncio.create_task(requests_async.get(url))
        tasks.append(task)
    responses = await asyncio.gather(*tasks)
    for response in responses:
        print(response.json())
starttime = time.time()
asyncio.run(asynchronous(urls))
print(time.time() - starttime)

Ordering

Asynchronous running can cause your responses to be out of order. If this is an issue, create your own responses list and fill it up, rather than receiving the output from asyncio.gather().

async def asynchronous_ordered(urls): responses = [None] * len(urls) # create own responses list tasks = [] for i in range(len(urls)): url = urls[i] task = asyncio.create_task(fetch(url, responses, i)) tasks.append(task) await asyncio.gather(*tasks) # responses is not set to equal this for response in responses: print(response.json()) async def fetch(url, responses, i): response = await requests.get(url) responses[i] = response # fill up responses list

Batching

Sometimes running too many requests concurrently can cause timeout errors in your resource. This is when you need to create tasks in batches and gather them separately to avoid the issue. Find the batch_size that best fits your code by experimenting with a smaller portion of requests. Requests that take longer to process (long server delay) are more likely to cause errors than others. In my own experience with NLMatic’s engine, MongoDB had timeout errors whenever I ran batches of size greater than 10.

async def asynchronous_ordered_batched(urls, batch_size=10):
    responses = [None] * len(urls)
    kiterations = int(len(urls) / batch_size) + 1
    for k in range(0, kiterations):
        tasks = []
        m = min((k + 1) * batch_size, len(urls))
        for i in range(k * batch_size, m):
            url = urls[i]
            task = asyncio.create_task(fetch(url, responses, i))
            tasks.append(task)
        await asyncio.gather(*tasks)
    for response in responses:
        print(response.json())
 *****

Sample asyncio.as_completed code:

async with ClientSession() as session:
    tasks = {self.fetch(session, url) for url in self.urls}
    for task in asyncio.as_completed(tasks):
        raw_data = await asyncio.shield(task)
        data = self.extract_data(*raw_data)
        await self.store_data(data)

Sample asyncio.gather code:

async with ClientSession() as session:
    tasks = {self.fetch(session, url) for url in self.urls}
    results = await asyncio.gather(*tasks)
for result in results:
    data = self.extract_data(*result)
    await self.store_data(data)