Tuesday, January 7, 2025

OpenAI-compatible API with FastAPI

# source: https://towardsdatascience.com

import asyncio
import json
import time

from typing import Optional, List

from pydantic import BaseModel, Field

from starlette.responses import StreamingResponse
from fastapi import FastAPI, HTTPException, Request

app = FastAPI(title="OpenAI-compatible API")


# data models
class Message(BaseModel):
    role: str
    content: str


class ChatCompletionRequest(BaseModel):
    model: Optional[str] = "mock-gpt-model"
    messages: List[Message]
    max_tokens: Optional[int] = 512
    temperature: Optional[float] = 0.1
    stream: Optional[bool] = False


async def _resp_async_generator(text_resp: str):
    # let's pretend every word is a token and return it over time
    tokens = text_resp.split(" ")

    for i, token in enumerate(tokens):
        chunk = {
            "id": i,
            "object": "chat.completion.chunk",
            "created": time.time(),
            "model": request.model,
            "choices": [{"delta": {"content": token + " "}}],
        }
        yield f"data: {json.dumps(chunk)}\n\n"
        await asyncio.sleep(1)
    yield "data: [DONE]\n\n"


@app.post("/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
    if request.messages:
        resp_content = (
            "As a mock AI Assitant, I can only echo your last message: "
            + request.messages[-1].content
        )
    else:
        resp_content = "As a mock AI Assitant, I can only echo your last message, but there wasn't one!"
    if request.stream:
        return StreamingResponse(
            _resp_async_generator(resp_content), media_type="application/x-ndjson"
        )

    return {
        "id": "1337",
        "object": "chat.completion",
        "created": time.time(),
        "model": request.model,
        "choices": [{"message": Message(role="assistant", content=resp_content)}],
    }


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)


# Direct generation
from openai import OpenAI

# init client and connect to localhost server
client = OpenAI(
    api_key="fake-api-key",
    base_url="http://localhost:8000" # change the default port if needed
)

# call API
chat_completion = client.chat.completions.create(
    messages=[
        {
            "role": "user",
            "content": "Say this is a test",
        }
    ],
    model="gpt-1337-turbo-pro-max",
)

# print the top "choice"
print(chat_completion.choices[0].message.content)

# Streaming chunks
from openai import OpenAI

# init client and connect to localhost server
client = OpenAI(
    api_key="fake-api-key",
    base_url="http://localhost:8000" # change the default port if needed
)

stream = client.chat.completions.create(
    model="mock-gpt-model",
    messages=[{"role": "user", "content": "Say this is a test"}],
    stream=True,
)
for chunk in stream:
    print(chunk.choices[0].delta.content or "")

Hugging Face Gradio Translation Space

Gradio Machine Translation Space on HuggingFace

Sunday, January 5, 2025

Parse text colum from mtConcepts of a SDLTB file with Python XML or BeautifulSoup

 from bs4 import BeautifulSoup

# XML data

xml_data = """<cG><c>1</c><trG><tr type="origination">letco</tr><dt>2020-08-30T19:12:50</dt></trG><trG><tr type="modification">letco</tr><dt>2020-08-30T19:12:58</dt></trG><lG><l lang="DE" type="German"/><tG><t>Pelletpresse</t><trG><tr type="origination">letco</tr><dt>2020-08-30T19:12:50</dt></trG><trG><tr type="modification">letco</tr><dt>2020-08-30T19:12:50</dt></trG></tG></lG><lG><l lang="RO" type="Romanian"/><tG><t>presă de peleți</t><trG><tr type="origination">letco</tr><dt>2020-08-30T19:12:58</dt></trG><trG><tr type="modification">letco</tr><dt>2020-08-30T19:12:58</dt></trG></tG></lG></cG>"""


# Parse the XML

soup = BeautifulSoup(xml_data, "xml")


# Function to extract translation details

def parse_translation(translation_element):

lang = translation_element.find("l").attrs

text = translation_element.find("t").text

transactions = [

{

"type": tr.find("tr")["type"],

"actor": tr.find("tr").text,

"datetime": tr.find("dt").text,

}

for tr in translation_element.find_all("trG")

]

return {

"lang": lang["lang"],

"type": lang["type"],

"text": text,

"transactions": transactions,

}


# Extract the main data

data = {

"c": soup.find("c").text,

"transactions": [

{

"type": tr.find("tr")["type"],

"actor": tr.find("tr").text,

"datetime": tr.find("dt").text,

}

for tr in soup.find_all("trG", recursive=False)

],

"translations": [parse_translation(lG) for lG in soup.find_all("lG")],

}


# Output the extracted data

print(data)



from xml.etree import ElementTree as ET


# Parse the XML

root = ET.fromstring(xml_data)


# Function to extract translation details

def parse_translation(translation_element):

lang = translation_element.find("./l").attrib

text = translation_element.find("./tG/t").text

transactions = [

{

"type": tr.find("./tr").attrib["type"],

"actor": tr.find("./tr").text,

"datetime": tr.find("./dt").text,

}

for tr in translation_element.findall("./tG/trG")

]

return {

"lang": lang["lang"],

"type": lang["type"],

"text": text,

"transactions": transactions,

}


# Extract the main data

data = {

"c": root.find("./c").text,

"transactions": [

{

"type": tr.find("./tr").attrib["type"],

"actor": tr.find("./tr").text,

"datetime": tr.find("./dt").text,

}

for tr in root.findall("./trG")

],

"translations": [parse_translation(lG) for lG in root.findall("./lG")],

}


# Output the extracted data

print(data)