Tuesday, November 14, 2023

Server Send Events Stream SSE + Javascript

from quart import abort, make_response, request, Quart

app = Quart(__name__)
@app.get("/sse")
async def sse():
    if "text/event-stream" not in request.accept_mimetypes:
        abort(400)
    response = await make_response(
        send_events(),
        {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
            'Transfer-Encoding': 'chunked',
        },
    )
    response.timeout = None  # Disable the timeout for this streaming response
    return response
 
<script>
        document.addEventListener('DOMContentLoaded', (event) => {
            const eventSource = new EventSource('/sse');
            eventSource.onmessage = function(event) {
                console.log('New message:', event.data);
                // You can update the DOM here
            };
            eventSource.onopen = function(event) {
                console.log('Connection to server opened.');
            };
            eventSource.onerror = function(event) {
                console.error('EventSource failed.');
            };
            // To close the connection when the window is closed
            window.onbeforeunload = () => {
                eventSource.close();
            };
        });
    </script>
 
With parameters:
from quart import Quart, request, abort, make_response
from dataclasses import dataclass

app = Quart(__name__)

@dataclass
class ServerSentEvent:
data: str
event: str | None = None
id: int | None = None
retry: int | None = None

def encode(self) -> bytes:
message = f"data: {self.data}"
if self.event is not None:
message = f"{message}\nevent: {self.event}"
if self.id is not None:
message = f"{message}\nid: {self.id}"
if self.retry is not None:
message = f"{message}\nretry: {self.retry}"
message = f"{message}\n\n"
return message.encode('utf-8')

@app.get("/chat-updates")
async def chat_updates():
if "text/event-stream" not in request.accept_mimetypes:
abort(400)

friends = request.args.get('friends', default='false') == 'true'
format = request.args.get('format', default='simple')

async def send_events():
while True:
# Your logic to get updates, possibly filtering based on the query parameters
data = ... # Replace with your data retrieval logic
event = ServerSentEvent(data)
yield event.encode()

response = await make_response(
send_events(),
{
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Transfer-Encoding': 'chunked',
},
)
response.timeout = None
return response

# Run the app
if __name__ == "__main__":
app.run()
Javascript:
<script>
const params = new URLSearchParams({
friends: 'true',
format: 'detailed'
});

const url = `https://my-server/chat-updates?${params}`;
const eventSource = new EventSource(url);

eventSource.onmessage = function(event) {
// Handle incoming messages
console.log(event.data);
};

eventSource.onerror = function(error) {
// Handle any errors that occur
console.error("EventSource failed:", error);
}; 
</script>
 
// server.js
const http = require('http');
const es = require('event-stream');
// Create a HTTP server
const server = http.createServer((req, res) => {
// Check if the request path is /stream
if (req.url === '/stream') {
// Set the response headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// Create a counter variable
let counter = 0;
// Create an interval function that sends an event every second
const interval = setInterval(() => {
// Increment the counter
counter++;
// Create an event object with name, data, and id properties
const event = {
name: 'message',
data: `Hello, this is message number ${counter}`,
id: counter
};
// Convert the event object to a string
const eventString = `event: ${event.name}\ndata: ${event.data}\nid: ${event.id}\n\n`;
// Write the event string to the response stream
res.write(eventString);
// End the response stream after 10 events
if (counter === 10) {
clearInterval(interval);
res.end();
}
}, 1000);
} else {
// Handle other requests
res.writeHead(404);
res.end('Not found');
}
});
// Listen on port 3000
server.listen(3000, () => {
console.log('Server listening on port 3000');
});
 
// client.js
// Fetch the event stream from the server
fetch('/stream')
.then(response => {
// Get the readable stream from the response body
const stream = response.body;
// Get the reader from the stream
const reader = stream.getReader();
// Define a function to read each chunk
const readChunk = () => {
// Read a chunk from the reader
reader.read()
.then(({
value,
done
}
) =>
{
// Check if the stream is done
if (done) {
// Log a message
console.log('Stream finished');
// Return from the function
return;
}
// Convert the chunk value to a string
const chunkString = new TextDecoder().decode(value);
// Log the chunk string
console.log(chunkString);
// Read the next chunk
readChunk();
})
.catch(error => {
// Log the error
console.error(error);
});
};
// Start reading the first chunk
readChunk();
})
.catch(error => {
// Log the error
console.error(error);
});
  
 
from flask import Flask, Response, render_template
import itertools
import time

app = Flask(__name__)

@app.route('/')
def index():
return render_template('index.html')

@app.route('/connect')
def publish_hello():
def stream():
for idx in itertools.count():
msg = f"data: <p>This is {idx}.</p>\n\n"
yield msg
time.sleep(1)
return Response(stream(), mimetype="text/event-stream")

Htmx

<!DOCTYPE html>
<html>
<head>
<script src="https://unpkg.com/htmx.org@1.8.6"></script>
<script src="https://unpkg.com/htmx.org/dist/ext/sse.js"></script>
</head>
<body>
<div hx-ext="sse" sse-connect="/connect" sse-swap="message">
Contents of this box will be updated in real time with every SSE message received from the server.
</div>
</body>
</html>
 
In this example, the Flask app sends a message to the HTMX client every second. The message is wrapped in an HTML paragraph tag and sent as an SSE event. The HTMX client listens to the SSE endpoint and updates the contents of the HTML element with the sse-swap attribute with the message received from the server.
Streaming data from Flask to HTMX using Server-Side Events (SSE) | mathspp 
 // Client-side Javascript in the HTML 
var targetContainer = document.getElementById("this-div");
var eventSource = new EventSource("/stream");
eventSource.onmessage = function(e) {
targetContainer.innerHTML = e.data;
};

Generator + SSE: So why are Python generators good with SSE? It’s simply because they can keeping looping and yielding data and handing it to the client very seamlessly. Here is a simple Python implementation of SSE in Flask:

@route("/stream")
def stream():
def eventStream():
while True:
# Poll data from the database
# and see if there's a new message
if len(messages) > len(previous_messages):
yield "data:
{}\n\n".format(messages[len(messages)-1)])"

return Response(eventStream(), mimetype="text/event-stream")

This is a simple hypothetical event source that checks if there’s a new inbox message and yield the new message. For the browser to acknowledge a server-sent message, you’ll have to comply to this format:

"data: <any_data>\n\n"

You have the option to also send with the data the event and id.

"id: <any_id>\nevent: <any_message>\ndata: <any_data>\n\n"

Note that the fields do not have to be in any order as long as there is a newline (\n) for each field and two (\n\n) at the end of them. With additional event field, you can have more control how you push data to the browser.

// Client-side Javascript in the HTMLvar targetContainer = document.getElementById("this-div");
var eventSource = new EventSource("/stream");
eventSource.addEventListener = (<any_message>, function(e) {
targetContainer.innerHTML = e.data;

if (e.data > 20) {
targetContainer.style.color = "red";
}
};

This will basically render the DOM with the latest data on the specified event message and change the color to “red” when it exceeds 20.

https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#receiving_events_from_the_server

Warning: When not used over HTTP/2, SSE suffers from a limitation to the maximum number of open connections, which can be especially painful when opening multiple tabs, as the limit is per browser and is set to a very low number (6).
The issue has been marked as "Won't fix" in Chrome and Firefox.
This limit is per browser + domain, which means that you can open 6 SSE connections
across all of the tabs to www.example1.com and another 6 SSE connections to www.example2.com (per Stackoverflow). When using HTTP/2, the maximum number of simultaneous HTTP streams is negotiated between the server and the client (defaults to 100). 

Fields
Each message received has some combination of the following fields, one per line:

event
    A string identifying the type of event described. If this is specified, an event will be dispatched on the browser to the listener for the specified event name; the website source code should use addEventListener() to listen for named events. The onmessage handler is called if no event name is specified for a message.

data
    The data field for the message. When the EventSource receives multiple consecutive lines that begin with data:, it concatenates them, inserting a newline character between each one. Trailing newlines are removed.

id
    The event ID to set the EventSource object's last event ID value.

retry
    The reconnection time. If the connection to the server is lost, the browser will wait for the specified time before attempting to reconnect. This must be an integer, specifying the reconnection time in milliseconds. If a non-integer value is specified, the field is ignored.

All other field names are ignored.