Showing posts with label Quart. Show all posts
Showing posts with label Quart. Show all posts

Tuesday, October 8, 2024

Quart Async Example

Asynchronous Programming - One of the key features of Quart is its support for asynchronous programming. Asynchronous programming allows us to handle multiple requests at the same time and provide faster response times.

from quart import Quart, render_template

app = Quart(__name__)

async def get_items():
    # Simulate a long-running task
    await asyncio.sleep(5)
    return ['item1', 'item2', 'item3']

@app.route('/items')
async def items():
    items = await get_items()
    return await render_template('items.html', items=items)

if __name__ == '__main__':
    app.run()

We define an async function called get_items that simulates a long-running task by sleeping for 5 seconds. We then define a route ('/items') that calls this async function and renders our items.html template with the list of items.

By using asynchronous programming, we can handle other requests while the get_items function is running. This allows us to provide faster response times and improve the overall performance of our web application.

In Quart, we can also use asynchronous libraries and modules to handle tasks such as database queries and network requests. Here’s an example of how to use an asynchronous database driver (aiosqlite) in Quart:

import aiosqlite

async def get_items():
    async with aiosqlite.connect('mydatabase.db') as db:
        cursor = await db.execute('SELECT * FROM items')
        rows = await cursor.fetchall()
    return [row[0] for row in rows]

We use the aiosqlite module to connect to a SQLite database and retrieve a list of items from a table called items.

Quart’s support for asynchronous programming makes it easy to build high-performance web applications in Python. In the next section, we will look at how to integrate with databases in Quart.

Integrating with Databases

Quart makes it easy to integrate with databases and store data in our web application. Here’s an example of how to use SQLite with Quart:

import sqlite3
from quart import Quart, g, jsonify, request

app = Quart(__name__)

DATABASE = 'mydatabase.db'

def get_db():
    if 'db' not in g:
        g.db = sqlite3.connect(DATABASE)
        g.db.row_factory = sqlite3.Row
    return g.db

@app.route('/api/items')
async def get_items():
    db = get_db()
    cursor = db.execute('SELECT * FROM items')
    rows = cursor.fetchall()
    items = [dict(row) for row in rows]
    return await jsonify(items)

@app.route('/api/items', methods=['POST'])
async def add_item():
    data = await request.get_json()
    db = get_db()
    db.execute('INSERT INTO items (name) VALUES (?)', [data['name']])
    db.commit()
    response_data = {'message': 'Item added successfully', 'item': data['name']}
    response = await jsonify(response_data)
    response.status_code = 201
    return response

if __name__ == '__main__':
    app.run()

In this example, we define a function called get_db that connects to our SQLite database and returns a database connection object. We also define two routes - one to retrieve a list of items from the database and one to add a new item to the database.

By using the get_db function, we can ensure that we have a database connection available for each request. We can also use the sqlite3.Row factory to return rows as dictionaries, which makes it easy to convert our database results to JSON.

In the add_item route, we use the request.get_json function to extract the data from the request body and insert it into the items table in our database.

Quart supports other databases such as PostgreSQL and MySQL, and can also integrate with ORMs such as SQLAlchemy.

Deploy a Quart web application with authentication and authorization

In many web applications, we need to add authentication and authorization to restrict access to certain parts of our application. In Quart, we can use third-party libraries such as Flask-Login and Flask-Principal to add authentication and authorization.

Example with Flask-Login in Quart:

from quart import Quart, render_template, request, redirect, url_for
from flask_login import LoginManager, UserMixin, login_required, login_user, logout_user

app = Quart(__name__)
app.secret_key = 'mysecretkey'

login_manager = LoginManager()
login_manager.init_app(app)

class User(UserMixin):
    def __init__(self, id):
        self.id = id

    def __repr__(self):
        return f'<User {self.id}>'

@login_manager.user_loader
def load_user(user_id):
    return User(user_id)

@app.route('/')
async def index():
    return await render_template('index.html')

@app.route('/login', methods=['GET', 'POST'])
async def login():
    if request.method == 'POST':
        user_id = request.form['user_id']
        user = User(user_id)
        login_user(user)
        return redirect(url_for('dashboard'))
    return await render_template('login.html')

@app.route('/dashboard')
@login_required
async def dashboard():
    return await render_template('dashboard.html')

@app.route('/logout')
@login_required
async def logout():
    logout_user()
    return redirect(url_for('index'))

if __name__ == '__main__':
    app.run()

In this example, we define a User class that inherits from UserMixin, which provides default implementations for some methods required by Flask-Login. We also define routes for the login page, dashboard page, and logout page.

By using the @login_required decorator, we can restrict access to the dashboard and logout routes to only authenticated users. We can also use the login_user and logout_user functions to handle user authentication.

In the login route, we retrieve the user ID from the request form and create a User object. We then use the login_user function to authenticate the user and redirect them to the dashboard page.

Flask-Principal is another library that provides more fine-grained control over access to resources. It allows us to define roles and permissions for users and restrict access to certain routes or resources based on those roles and permissions.

In this way, we can add authentication and authorization to our Quart web application and provide more secure access to our resources.

Deploying the Web Application

After we have developed our Quart web application, we need to deploy it to a production environment. There are many ways to deploy a Quart application, including using a web server such as Nginx or Apache, or deploying to a Platform as a Service (PaaS) provider such as Heroku or AWS Elastic Beanstalk.

One common approach to deploying a Quart application is to use the Quart-ASGI server, which is a lightweight ASGI server designed specifically for Quart. Here’s an example of how to deploy a Quart application using Quart-ASGI:

import uvicorn
from myapp import app

if __name__ == '__main__':
    uvicorn.run(app, host='0.0.0.0', port=5000)

In this example, we import the uvicorn server and our app object from our Quart application. We then call the uvicorn.run function with our app object and specify the host and port to run the server on.

By using the Quart-ASGI server, we can take advantage of Quart’s asynchronous programming features and provide a high-performance web application.

Another approach to deploying a Quart application is to use Docker. Docker allows us to package our application and its dependencies into a container, which can be easily deployed to any platform that supports Docker.

Dockerfile for a Quart application:

FROM python:3.9-alpine

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["quart", "run", "--host", "0.0.0.0"]

In this example, we use a Python 3.9 base image and install the dependencies specified in our requirements.txt file. We then copy our application code into the container and specify the command to run the Quart server.

By using Docker, we can easily deploy our Quart application to any platform that supports Docker, such as Kubernetes or AWS Elastic Beanstalk.

In conclusion, there are many ways to deploy a Quart web application, and the choice depends on factors such as performance, scalability, and ease of deployment. By using the Quart-ASGI server or Docker, we can deploy our Quart application to production and provide a high-performance, reliable web application.

source: https://pythonic.rapellys.biz

Tuesday, March 5, 2024

Barebone Quart API from Sqlite DB

import asyncio
import sqlite3
from quart import Quart

app = Quart(__name__)

async def connect_db():
    conn = sqlite3.connect("your_database.db")
    conn.row_factory = sqlite3.Row
    return conn

async def fetch_data(column1):
    async with connect_db() as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT ?, column2 FROM your_table", (column1,))
        data = cursor.fetchall()
    return [dict(row) for row in data]

@app.route("/api/data/<column_name>")
async def get_data(column_name):
    data = await fetch_data(column_name)
    return data  # Return the list of dictionaries directly

if __name__ == "__main__":
    app.run(debug=True)

Here, the get_data function directly returns the list of dictionaries obtained from the database, and Quart automatically handles the serialization to JSON format before sending the response.

However, using jsonify offers some advantages:

  • Consistency: It provides a consistent way to handle different data structures, ensuring they are all converted to JSON format correctly.
  • Customization: It allows for additional options like specifying status codes, setting custom headers, or customizing the JSON serialization if needed.

Therefore, while jsonify isn't strictly required in this case, it can improve code clarity and maintainability, especially for larger projects or when dealing with diverse data structures.

or with jsonify (not necessary)

import asyncio
import sqlite3
from quart import Quart, jsonify

app = Quart(__name__)

async def connect_db():
    conn = sqlite3.connect("your_database.db")
    conn.row_factory = sqlite3.Row
    return conn

async def fetch_data(column1):
    async with connect_db() as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT ?, column2 FROM your_table", (column1,))  # Dynamic query
        data = cursor.fetchall()
    return [dict(row) for row in data]

@app.route("/api/data/<column_name>")
async def get_data(column_name):
    data = await fetch_data(column_name)
    return jsonify(data)

if __name__ == "__main__":
    app.run(debug=True)

How to Use:

Now, you would access the API by providing the first column name as part of the URL. For example:

  • /api/data/name (assuming the first column is named "name")
  • /api/data/age (assuming the first column is named "age")

Tuesday, November 14, 2023

Server Send Events Stream SSE + Javascript

from quart import abort, make_response, request, Quart

app = Quart(__name__)
@app.get("/sse")
async def sse():
    if "text/event-stream" not in request.accept_mimetypes:
        abort(400)
    response = await make_response(
        send_events(),
        {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
            'Transfer-Encoding': 'chunked',
        },
    )
    response.timeout = None  # Disable the timeout for this streaming response
    return response
 
<script>
        document.addEventListener('DOMContentLoaded', (event) => {
            const eventSource = new EventSource('/sse');
            eventSource.onmessage = function(event) {
                console.log('New message:', event.data);
                // You can update the DOM here
            };
            eventSource.onopen = function(event) {
                console.log('Connection to server opened.');
            };
            eventSource.onerror = function(event) {
                console.error('EventSource failed.');
            };
            // To close the connection when the window is closed
            window.onbeforeunload = () => {
                eventSource.close();
            };
        });
    </script>
 
With parameters:
from quart import Quart, request, abort, make_response
from dataclasses import dataclass

app = Quart(__name__)

@dataclass
class ServerSentEvent:
data: str
event: str | None = None
id: int | None = None
retry: int | None = None

def encode(self) -> bytes:
message = f"data: {self.data}"
if self.event is not None:
message = f"{message}\nevent: {self.event}"
if self.id is not None:
message = f"{message}\nid: {self.id}"
if self.retry is not None:
message = f"{message}\nretry: {self.retry}"
message = f"{message}\n\n"
return message.encode('utf-8')

@app.get("/chat-updates")
async def chat_updates():
if "text/event-stream" not in request.accept_mimetypes:
abort(400)

friends = request.args.get('friends', default='false') == 'true'
format = request.args.get('format', default='simple')

async def send_events():
while True:
# Your logic to get updates, possibly filtering based on the query parameters
data = ... # Replace with your data retrieval logic
event = ServerSentEvent(data)
yield event.encode()

response = await make_response(
send_events(),
{
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Transfer-Encoding': 'chunked',
},
)
response.timeout = None
return response

# Run the app
if __name__ == "__main__":
app.run()
Javascript:
<script>
const params = new URLSearchParams({
friends: 'true',
format: 'detailed'
});

const url = `https://my-server/chat-updates?${params}`;
const eventSource = new EventSource(url);

eventSource.onmessage = function(event) {
// Handle incoming messages
console.log(event.data);
};

eventSource.onerror = function(error) {
// Handle any errors that occur
console.error("EventSource failed:", error);
}; 
</script>
 
// server.js
const http = require('http');
const es = require('event-stream');
// Create a HTTP server
const server = http.createServer((req, res) => {
// Check if the request path is /stream
if (req.url === '/stream') {
// Set the response headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// Create a counter variable
let counter = 0;
// Create an interval function that sends an event every second
const interval = setInterval(() => {
// Increment the counter
counter++;
// Create an event object with name, data, and id properties
const event = {
name: 'message',
data: `Hello, this is message number ${counter}`,
id: counter
};
// Convert the event object to a string
const eventString = `event: ${event.name}\ndata: ${event.data}\nid: ${event.id}\n\n`;
// Write the event string to the response stream
res.write(eventString);
// End the response stream after 10 events
if (counter === 10) {
clearInterval(interval);
res.end();
}
}, 1000);
} else {
// Handle other requests
res.writeHead(404);
res.end('Not found');
}
});
// Listen on port 3000
server.listen(3000, () => {
console.log('Server listening on port 3000');
});
 
// client.js
// Fetch the event stream from the server
fetch('/stream')
.then(response => {
// Get the readable stream from the response body
const stream = response.body;
// Get the reader from the stream
const reader = stream.getReader();
// Define a function to read each chunk
const readChunk = () => {
// Read a chunk from the reader
reader.read()
.then(({
value,
done
}
) =>
{
// Check if the stream is done
if (done) {
// Log a message
console.log('Stream finished');
// Return from the function
return;
}
// Convert the chunk value to a string
const chunkString = new TextDecoder().decode(value);
// Log the chunk string
console.log(chunkString);
// Read the next chunk
readChunk();
})
.catch(error => {
// Log the error
console.error(error);
});
};
// Start reading the first chunk
readChunk();
})
.catch(error => {
// Log the error
console.error(error);
});
  
 
from flask import Flask, Response, render_template
import itertools
import time

app = Flask(__name__)

@app.route('/')
def index():
return render_template('index.html')

@app.route('/connect')
def publish_hello():
def stream():
for idx in itertools.count():
msg = f"data: <p>This is {idx}.</p>\n\n"
yield msg
time.sleep(1)
return Response(stream(), mimetype="text/event-stream")

Htmx

<!DOCTYPE html>
<html>
<head>
<script src="https://unpkg.com/htmx.org@1.8.6"></script>
<script src="https://unpkg.com/htmx.org/dist/ext/sse.js"></script>
</head>
<body>
<div hx-ext="sse" sse-connect="/connect" sse-swap="message">
Contents of this box will be updated in real time with every SSE message received from the server.
</div>
</body>
</html>
 
In this example, the Flask app sends a message to the HTMX client every second. The message is wrapped in an HTML paragraph tag and sent as an SSE event. The HTMX client listens to the SSE endpoint and updates the contents of the HTML element with the sse-swap attribute with the message received from the server.
Streaming data from Flask to HTMX using Server-Side Events (SSE) | mathspp 
 // Client-side Javascript in the HTML 
var targetContainer = document.getElementById("this-div");
var eventSource = new EventSource("/stream");
eventSource.onmessage = function(e) {
targetContainer.innerHTML = e.data;
};

Generator + SSE: So why are Python generators good with SSE? It’s simply because they can keeping looping and yielding data and handing it to the client very seamlessly. Here is a simple Python implementation of SSE in Flask:

@route("/stream")
def stream():
def eventStream():
while True:
# Poll data from the database
# and see if there's a new message
if len(messages) > len(previous_messages):
yield "data:
{}\n\n".format(messages[len(messages)-1)])"

return Response(eventStream(), mimetype="text/event-stream")

This is a simple hypothetical event source that checks if there’s a new inbox message and yield the new message. For the browser to acknowledge a server-sent message, you’ll have to comply to this format:

"data: <any_data>\n\n"

You have the option to also send with the data the event and id.

"id: <any_id>\nevent: <any_message>\ndata: <any_data>\n\n"

Note that the fields do not have to be in any order as long as there is a newline (\n) for each field and two (\n\n) at the end of them. With additional event field, you can have more control how you push data to the browser.

// Client-side Javascript in the HTMLvar targetContainer = document.getElementById("this-div");
var eventSource = new EventSource("/stream");
eventSource.addEventListener = (<any_message>, function(e) {
targetContainer.innerHTML = e.data;

if (e.data > 20) {
targetContainer.style.color = "red";
}
};

This will basically render the DOM with the latest data on the specified event message and change the color to “red” when it exceeds 20.

https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#receiving_events_from_the_server

Warning: When not used over HTTP/2, SSE suffers from a limitation to the maximum number of open connections, which can be especially painful when opening multiple tabs, as the limit is per browser and is set to a very low number (6).
The issue has been marked as "Won't fix" in Chrome and Firefox.
This limit is per browser + domain, which means that you can open 6 SSE connections
across all of the tabs to www.example1.com and another 6 SSE connections to www.example2.com (per Stackoverflow). When using HTTP/2, the maximum number of simultaneous HTTP streams is negotiated between the server and the client (defaults to 100). 

Fields
Each message received has some combination of the following fields, one per line:

event
    A string identifying the type of event described. If this is specified, an event will be dispatched on the browser to the listener for the specified event name; the website source code should use addEventListener() to listen for named events. The onmessage handler is called if no event name is specified for a message.

data
    The data field for the message. When the EventSource receives multiple consecutive lines that begin with data:, it concatenates them, inserting a newline character between each one. Trailing newlines are removed.

id
    The event ID to set the EventSource object's last event ID value.

retry
    The reconnection time. If the connection to the server is lost, the browser will wait for the specified time before attempting to reconnect. This must be an integer, specifying the reconnection time in milliseconds. If a non-integer value is specified, the field is ignored.

All other field names are ignored.

Saturday, October 14, 2023

Add Global Jinja Variables for Quart/Flask

app = Quart(__name__, template_folder='templates', static_folder='static')
app.jinja_env.globals['emojiflags'] = emojiflags 

 or

app = Flask(__name__, template_folder='templates', static_folder='static')
app.add_template_global(name='emojiflags', f=emojiflags

Monday, September 4, 2023

Define and Call Background Task in Quart

def write2sqliteonecol(instance, stringonecol): # synchronous function
    try:
        instance.createandwritelocaldb((stringonecol, ))
    except Exception as error:
        print('Error writing DWDS to db!', error)
 
async def asyncwrite2sqliteonecol(instance, stringonecol): # asynchronous function
...
    instance.createandwritelocaldb((stringonecol, ))
...

Async case 1 - asyncio.get_event_loop().create_task:
asyncio.get_event_loop().create_task(asyncwrite2sqliteonecol(instance, stringonecol))
 
Async Case 2 - app.add_background_task with another function passing arguments:
async def task():  
    await asyncwrite2sqliteonecol(instance, stringonecol) 
app.add_background_task(task)
 
Sync case 3 - arguments after comma, not in brackets:
app.add_background_task(write2sqliteonecol, instance, stringonecol) #
 
Sync case 4 - arguments in brackets + lambda function:
app.add_background_task(lambda: write2sqliteonecol(instance, stringonecol))
 
Sync case 5 - needs import:
from functools import partial
app.add_background_task(partial(write2sqliteonecol, instance, stringonecol))
 
*****
Both asyncio.get_event_loop().create_task() and app.add_background_task() are methods to run tasks concurrently in Quart, but they serve slightly different purposes.

asyncio.get_event_loop().create_task() is a lower-level asyncio function that schedules a coroutine to be run on the event loop. It's a general-purpose way to run tasks concurrently in any asyncio-based application.

app.add_background_task() is a Quart-specific method that adds a task to be run in the background when the Quart app is running. One key feature of app.add_background_task() is that it ensures the task finishes during the shutdown of the Quart app, unless the server times out and cancels it. This is particularly useful in a production environment where you want to ensure that all tasks are completed before the app shuts down.

In terms of performance, there should not be a significant difference between the two methods. Both methods will run the tasks concurrently, which can lead to a significant performance improvement over running the tasks sequentially, especially for IO-bound tasks.

However, if you're writing to multiple SQLite databases, you should be aware that SQLite is not designed for high-level concurrency. It uses a file-based locking system for transactions which can become a bottleneck when many tasks are writing to the database concurrently. If high-level concurrency is required, you might want to consider using a different database system that is designed for concurrent access, such as PostgreSQL or MySQL.

The reason is that app.add_background_task is designed to ensure that the tasks finish during the shutdown of the application (unless the server times out and cancels). This is particularly useful in a production environment where you'd want all tasks to complete before the application shuts down.

On the other hand, asyncio.get_event_loop().create_task does not provide this guarantee. If the application shuts down before the task completes, the task may be cancelled and not finish its work. So, for writing to a database, which is a critical operation that should not be interrupted, app.add_background_task would be a better choice.

*******

functools.partial
is a function in Python's standard library that allows you to fix a certain number of arguments of a function and generate a new function. This can be useful when you want to "freeze" some parameters of a function.

Here is a simple example:

from functools import partial  
  
def multiply(x, y):  
    return x * y  
  
# Create a new function that multiplies by 2  
double = partial(multiply, 2)  
  
print(double(4))  # Outputs: 8  

In the context of the Quart code you've been working with, functools.partial could be used in scenarios where you want to predefine some arguments of a function that you want to run as a background task. For instance, if you have a function that writes to a SQLite database and takes the database connection and some data as arguments, you could use

functools.partial
to create a new function that has the database connection already set:

from functools import partial  
  
def write_to_db(connection, data):  
    # Code to write data to the database  
  
# Create a new function that writes to a specific database  
write_to_specific_db = partial(write_to_db, specific_connection)  
  
# Now you can use write_to_specific_db in your code and only pass the data  
app.add_background_task(write_to_specific_db, data)  
In this example,
write_to_specific_db
is a new function that takes only one argument (
data
), and uses
specific_connection
as the database connection. Source: AI from https://www.getonboard.dev

****

Quart has startup and shutdown methods that allow something to be started before the server starts serving and stopped when the server finishes serving. If your background task is mostly IO bound I'd recommend just using a coroutine function rather than a thread,

async def background_task():
    while True:
        ...

@app.before_serving
async def startup():
    app.background_task = asyncio.ensure_future(background_task())

@app.after_serving
async def shutdown():
    app.background_task.cancel()  # Or use a variable in the while loop

Or you can do the same with your Service,

@app.before_serving
async def startup():
    service.start()

@app.after_serving
async def shutdown():
    service.stop()

Note to check:
app.add_background_task(background_task) in "before" and app.background_tasks.pop().cancel() in "after".
 https://stackoverflow.com


Friday, September 1, 2023

Python Quart Parse Request + Cheatsheet

import site, os # get quart version with standard libs
sitepackagedirs = [x[0] for x in os.walk(site.getsitepackages()[1])]
print(quart.__name__, [pack for pack in sitedirs if 'quart-' in pack][0][-16:-10])

from quart import Quart, render_template, request

@app.route("/hello")
async def hello():

request.method
request.url
request.headers["X-Bob"]
request.args.get("a") # Query string e.g. example.com/hello?a=2
request.args.getlist()
request.args.to_dict())
await request.get_data() # Full raw body
(await request.form)["name"]
await request.args.getlist()
(await request.get_json())["key"]
request.cookies.get("name")

https://tedboy.github.io/flask/generated/generated/flask.Request.html

The request object is a Request subclass and provides all of the attributes Werkzeug defines plus a few Flask specific ones.

form
    A MultiDict with the parsed form data from POST or PUT requests. Please keep in mind that file uploads will not end up here, but instead in the files attribute.
args
    A MultiDict with the parsed contents of the query string. (The part in the URL after the question mark).
values
    A CombinedMultiDict with the contents of both form and args.
cookies
    A dict with the contents of all cookies transmitted with the request.
stream
    If the incoming form data was not encoded with a known mimetype the data is stored unmodified in this stream for consumption. Most of the time it is a better idea to use data which will give you that data as a string. The stream only returns the data once.
headers
    The incoming request headers as a dictionary like object.
data
    Contains the incoming request data as string in case it came with a mimetype Flask does not handle.
files
    A MultiDict with files uploaded as part of a POST or PUT request. Each file is stored as FileStorage object. It basically behaves like a standard file object you know from Python, with the difference that it also has a save() function that can store the file on the filesystem.
environ
    The underlying WSGI environment.
method
    The current request method (POST, GET etc.)

https://tedboy.github.io/flask/generated/generated/werkzeug.MultiDict.html

to_dict([flat]) Return the contents as regular dict.
getlist(key[, type]) Return the list of items for a given key.


# Requested url: http://192.168.2.100:5000/resultdic?q=Kern&sl=de&tl=ro&dic=IATE&dic=SAP
# http://192.168.2.100:5000/resultdic?q=Kern&sl=de&tl=ro&dic=all

allbidiclist: list[str] = ['Hallo.ro', 'Dict.cc', 'Linguee.com', 'IATE', 'SAP', 'Yandex']

@app.route("/resultdic", methods=["GET"])
async def resultdic():

if request.method == "GET":
searchformterm = request.args.get("q", None)
sourcelang = request.args.get("sl", None)
targetlang = request.args.get("tl", None) # get(key, default=None, type=None)
# Return the default value if the requested data doesn’t exist. If type is provided and is a callable it should convert the value, return it or raise a ValueError if that is not possible.
selecteddiclist = request.args.getlist("dic", None) # list from dic=Dict.cc&dic=SAP
if selecteddiclist: selecteddiclist = alldiclist if selecteddiclist[0] == "all" else selecteddiclist
print("request.args dict:", request.args.to_dict()) # Python dictionary

Cheatsheet

Basic App

from quart import Quart

app = Quart(__name__)

@app.route("/hello")
async def hello():
    return "Hello, World!"

if __name__ == "__main__":
    app.run(debug=True)

Routing

@app.route("/hello/<string:name>")  # example.com/hello/quart
async def hello(name):
    return f"Hello, {name}!"

Request Methods

@app.route("/get")  # GET Only by default
@app.route("/get", methods=["GET", "POST"])  # GET and POST
@app.route("/get", methods=["DELETE"])  # Just DELETE

JSON Responses

@app.route("/hello")
async def hello():
    return {"Hello": "World!"}

Template Rendering

from quart import render_template

@app.route("/hello")
async def hello():
    return await render_template("index.html")  # Required to be in templates/

Configuration

import json
import toml

app.config["VALUE"] = "something"

app.config.from_file("filename.toml", toml.load)
app.config.from_file("filename.json", json.load)

Request

from quart import request

@app.route("/hello")
async def hello():
    request.method
    request.url
    request.headers["X-Bob"]
    request.args.get("a")  # Query string e.g. example.com/hello?a=2
    await request.get_data()  # Full raw body
    (await request.form)["name"]
    (await request.get_json())["key"]
    request.cookies.get("name")

WebSocket

from quart import websocket

@app.websocket("/ws")
async def ws():
    websocket.headers
    while True:
        try:
            data = await websocket.receive()
            await websocket.send(f"Echo {data}")
        except asyncio.CancelledError:
            # Handle disconnect
            raise

Cookies

from quart import make_response

@app.route("/hello")
async def hello():
    response = await make_response("Hello")
    response.set_cookie("name", "value")
    return response

Abort

from quart import abort

@app.route("/hello")
async def hello():
    abort(409)

HTTP/2 & HTTP/3 Server Push

from quart import make_push_promise, url_for

@app.route("/hello")
async def hello():
    await make_push_promise(url_for('static', filename='css/minimal.css'))


Source: https://pgjones.gitlab.io

The docs describe the attributes available on the request object (from flask import request) during a request. In most common cases request.data will be empty because it's used as a fallback:

request.data Contains the incoming request data as string in case it came with a mimetype Flask does not handle.

  • request.args: the key/value pairs in the URL query string
  • request.form: the key/value pairs in the body, from a HTML post form, or JavaScript request that isn't JSON encoded
  • request.files: the files in the body, which Flask keeps separate from form. HTML forms must use enctype=multipart/form-data or files will not be uploaded.
  • request.values: combined args and form, preferring args if keys overlap
  • request.json: parsed JSON data. The request must have the application/json content type, or use request.get_json(force=True) to ignore the content type.

All of these are MultiDict instances (except for json). You can access values using:

  • request.form['name']: use indexing if you know the key exists
  • request.form.get('name'): use get if the key might not exist
  • request.form.getlist('name'): use getlist if the key is sent multiple times and you want a list of values. get only returns the first value.

Run in debug mode:

quart_cfg = hypercorn.Config()
quart_cfg.bind = ["0.0.0.0:8000"]
app = Quart(__name__)
...
async def main():
    await hypercorn.asyncio.serve(app,quart_cfg)

if __name__ == '__main__':
    client.loop.set_debug(True)
    client.loop.run_until_complete(main())

Run function at startup:

a background task that is started on startup,

async def schedule():
    while True:
        await asyncio.sleep(1)
        await do_work()

@app.before_serving
async def startup():
    app.add_background_task(schedule)

which will run schedule for the lifetime of the app, being cancelled at shutdown.

The ASGI lifespan specification includes the ability for awaiting coroutines before the first byte is received and after the final byte is sent, through the startup and shutdown lifespan events. This is particularly useful for creating and destroying connection pools. Quart supports this via the decorators before_serving() and after_serving(), which function like before_first_request(), and while_serving() which expects a function that returns a generator.

@app.before_serving
async def create_db_pool():
    app.db_pool = await ...
    g.something = something

@app.before_serving
async def use_g():
    g.something.do_something()

@app.while_serving
async def lifespan():
    ...  # startup
    yield
    ...  # shutdown

@app.route("/")
async def index():
    app.db_pool.execute(...)
    # g.something is not available here

@app.after_serving
async def create_db_pool():
    await app.db_pool.close()

AI response: Quart does not have a direct equivalent to Flask's @response.call_on_close. However, Quart provides a way to handle tasks in the background which can be used to achieve similar functionality.

from quart import Quart, request, jsonify  
from quart.ctx import copy_current_websocket_context  
 
app = Quart(__name__)  
 
@app.route('/')  
async def index():  
    @copy_current_websocket_context  
    async def after_request():  
        # Code to be executed after the response is sent  
        pass  
 
    app.add_background_task(after_request)  
    return jsonify({'message': 'Hello, World!'})  


In this example, the after_request function will be executed after the response is sent. The copy_current_websocket_context decorator is used to preserve the context for the function when it's run in the background. Please note that errors raised in a background task are logged but otherwise ignored allowing the app to continue - much like with request/websocket handling errors.

Source: https://stackoverflow.com