What does asyncio.create_task()
do?
It submits the coroutine to run "in the background", i.e.
concurrently with the current task and all other tasks, switching
between them at await
points. It returns an awaitable handle called a "task" which you can also use to cancel the execution of the coroutine.
It's one of the central primitives of asyncio, the asyncio equivalent
of starting a thread. (In the same analogy, awaiting the task with await
is the equivalent of joining a thread.) create_task
does exactly that: submit it to the event loop for execution concurrently with other tasks, the point of switching being any await
. asyncio.gather
is not the only way to achieve concurrency
in asyncio. It's just a utility function that makes it easier to wait
for a number of coroutines to all complete, and submit them to the event
loop at the same time. create_task
does just the submitting, it should have probably been called start_coroutine.
What is a task?
It's an asyncio construct that tracks execution of a coroutine in a concrete event loop. When you call create_task
,
you submit a coroutine for execution and receive back a handle. You can
await this handle when you actually need the result, or you can never
await it, if you don't care about the result. This handle is the task, and it inherits from Future
, which makes it awaitable and also provides the lower-level callback-based interface, such as add_done_callback
. The await
expression blocks the containing coroutine until the await
ed awaitable returns. This hinders the progress of the coroutine. But await
is necessary in a coroutine to yield control back to the event loop so that other coroutines can progress.
Standard settings:
import asyncio import requests async def main(): loop = asyncio.get_event_loop() futures = [ loop.run_in_executor( None, requests.get, 'http://example.org/' ) for i in range(20) ] for response in await asyncio.gather(*futures): pass loop = asyncio.get_event_loop() loop.run_until_complete(main())
loop.close()
Output:
sleeping for 1 second(s)
sleeping for 2 second(s)
done sleeping for 1 second(s)
done sleeping for 2 second(s)
asyncio.all_tasks(loop=None)
Return a set of not yet finished Task objects run by the loop. If loop is None, get_running_loop() is used for getting current loop.
The stepwise curve indicates that some requests are being executed in parallel. However, the curve is still asymptotically linear. The reason that we see this step pattern is that the default Executor has an internal pool of five threads that execute work. While five requests can be executed in parallel, any remaining requests will have to wait for a thread to become available.
# Asynchronous requests with larger thread pool import asyncio import concurrent.futures import requests async def main(): with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: loop = asyncio.get_event_loop() futures = [ loop.run_in_executor( executor, requests.get, 'http://example.org/' ) for i in range(20) ] for response in await asyncio.gather(*futures): pass loop = asyncio.get_event_loop() loop.run_until_complete(main())Python httpx stream data. For larger downloads, we can stream responses that do not load the entire response body into memory at once. For streaming, we use the
httpx.stream
method.sync_stream.py
#!/usr/bin/python import httpx url = 'https://download.freebsd.org/ftp/releases/amd64/amd64/ISO-IMAGES/12.0/FreeBSD-12.0-RELEASE-amd64-mini-memstick.img' with open('FreeBSD-12.0-RELEASE-amd64-mini-memstick.img', 'wb') as f: with httpx.stream('GET', url) as r: for chunk in r.iter_bytes(): f.write(chunk)
Python httpx multiple asynchronous GET requests
import httpx import asyncio async def get_async(url): async with httpx.AsyncClient() as client: return await client.get(url) urls = ["http://webcode.me", "https://httpbin.org/get"] async def launch(): resps = await asyncio.gather(*map(get_async, urls)) data = [resp.text for resp in resps] for html in data: print(html) asyncio.run(launch())
Python httpx asynchronous POST request
import httpx import asyncio async def main(): data = {'name': 'John Doe', 'occupation': 'gardener'} async with httpx.AsyncClient() as client: r = await client.post('https://httpbin.org/post', data=data)
# for json: r = await client.post('https://httpbin.org/post', json=data)
print(r.text)
asyncio.run(main())
Beware, however, that the with statement closes the client after exiting the context manager,
so the session might not be persistent. Just use
session = httpx.AsyncClient()
await asyncio.gather(*[call_url(session) for x in range(i)])
...
await session.aclose()
async | Used to indicate which methods are going to be run asynchronously <p> → These new methods are called coroutines. |
|
await | Used to run a coroutine once an asynchronous event loop has already started running → await can only be used inside a coroutine → Coroutines must be called with await , otherwise there will be a RuntimeWarning about enabling tracemalloc . |
|
asyncio.run() | Used to start running an asynchronous event loop from a normal program → asyncio.run() cannot be called in a nested fashion. You have to use await instead. → asyncio.run()
cannot be used if you are running the Python file in a Jupyter Notebook
because Jupyter Notebook already has a running asynchronous event loop.
You have to use await. (More on this in the Running the Code section) |
|
asyncio.create_task() | Used to schedule a coroutine execution → Does not need to be awaited → Allows you to line things up without actually running them first. |
|
asyncio.gather() | Used to run the scheduled executions → Needs to be awaited → This is vital to the asynchronous program, because you let it know which is the next task it can pick up before finishing the previous one. |
async def asynchronous(urls): tasks = [] for url in urls: task = asyncio.create_task(requests_async.get(url)) tasks.append(task) responses = await asyncio.gather(*tasks) for response in responses: print(response.json())
starttime = time.time()
asyncio.run(asynchronous(urls))
print(time.time() - starttime)
Ordering
Asynchronous running can cause your responses to be out of order. If
this is an issue, create your own responses list and fill it up, rather
than receiving the output from asyncio.gather()
.
async def asynchronous_ordered(urls):
responses = [None] * len(urls) # create own responses list
tasks = []
for i in range(len(urls)):
url = urls[i]
task = asyncio.create_task(fetch(url, responses, i))
tasks.append(task)
await asyncio.gather(*tasks) # responses is not set to equal this
for response in responses:
print(response.json())
async def fetch(url, responses, i):
response = await requests.get(url)
responses[i] = response # fill up responses list
Batching
Sometimes running too many requests concurrently can cause timeout
errors in your resource. This is when you need to create tasks in
batches and gather them separately to avoid the issue. Find the batch_size
that best fits your code by experimenting with a smaller portion of
requests. Requests that take longer to process (long server delay) are
more likely to cause errors than others. In my own experience with
NLMatic’s engine, MongoDB had timeout errors whenever I ran batches of
size greater than 10.
async def asynchronous_ordered_batched(urls, batch_size=10):
responses = [None] * len(urls)
kiterations = int(len(urls) / batch_size) + 1
for k in range(0, kiterations):
tasks = []
m = min((k + 1) * batch_size, len(urls))
for i in range(k * batch_size, m):
url = urls[i]
task = asyncio.create_task(fetch(url, responses, i))
tasks.append(task)
await asyncio.gather(*tasks)
for response in responses:
print(response.json())
*****
Sample asyncio.as_completed code:
async with ClientSession() as session:
tasks = {self.fetch(session, url) for url in self.urls}
for task in asyncio.as_completed(tasks):
raw_data = await asyncio.shield(task)
data = self.extract_data(*raw_data)
await self.store_data(data)
Sample asyncio.gather code:
async with ClientSession() as session:
tasks = {self.fetch(session, url) for url in self.urls}
results = await asyncio.gather(*tasks)
for result in results:
data = self.extract_data(*result)
await self.store_data(data)