FastAPI StreamingResponse 不使用生成器函数进行流式传输

Rob*_*itz 14 python streaming python-requests fastapi openai-api

我有一个相对简单的 FastAPI 应用程序,它接受查询并从 ChatGPT 的 API 流回响应。ChatGPT 正在流回结果,我可以看到它在输入时被打印到控制台。

不工作的是StreamingResponse通过 FastAPI 返回。相反,响应会一起发送。我真的不知道为什么这不起作用。

这是 FastAPI 应用程序代码:

import os
import time

import openai

import fastapi
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse

auth_scheme = HTTPBearer()
app = fastapi.FastAPI()

openai.api_key = os.environ["OPENAI_API_KEY"]


def ask_statesman(query: str):
    #prompt = router(query)
    
    completion_reason = None
    response = ""
    while not completion_reason or completion_reason == "length":
        openai_stream = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": query}],
            temperature=0.0,
            stream=True,
        )
        for line in openai_stream:
            completion_reason = line["choices"][0]["finish_reason"]
            if "content" in line["choices"][0].delta:
                current_response = line["choices"][0].delta.content
                print(current_response)
                yield current_response
                time.sleep(0.25)


@app.post("/")
async def request_handler(auth_key: str, query: str):
    if auth_key != "123":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": auth_scheme.scheme_name},
        )
    else:
        stream_response = ask_statesman(query)
        return StreamingResponse(stream_response, media_type="text/plain")


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, debug=True, log_level="debug")
Run Code Online (Sandbox Code Playgroud)

这是一个非常简单的test.py文件来测试这一点:

import requests

query = "How tall is the Eiffel tower?"
url = "http://localhost:8000"
params = {"auth_key": "123", "query": query}

response = requests.post(url, params=params, stream=True)

for chunk in response.iter_lines():
    if chunk:
        print(chunk.decode("utf-8"))
Run Code Online (Sandbox Code Playgroud)

Chr*_*ris 40

POST首先,使用请求从服务器请求数据并不是一个好的做法。使用GET请求代替会更适合您的情况。除此之外,您不应该发送凭据,例如作为auth_keyURL 的一部分(即使用查询字符串),而应该使用Headersand/or Cookies(使用HTTPS)。请查看此答案,了解有关标头和 cookie 概念的更多详细信息和示例,以及使用查询参数时涉及的风险。还可以在此处此处以及此处此处此处找到有关此主题的有用帖子。

\n

StreamingResponse其次,如果您在\ 的生成器函数内执行阻塞操作(即阻塞 I/O 密集型或 CPU 密集型任务) ,则应使用 来定义生成器函数,而def不是async def, as,否则为阻塞操作,以及time.sleep()生成器内部使用的函数都会阻止事件循环。如此处所述,如果用于流式传输响应正文的函数是普通def生成器而不是async def生成器,FastAPI 将使用iterate_in_threadpool()在单独的线程中运行迭代器/生成器,然后awaited\xe2\x80\x94see StreamingResponse\'s相关源代码。如果您更喜欢使用async def生成器,请确保在外部ThreadPool(或ProcessPool) 和await它中执行任何阻塞操作,并使用await asyncio.sleep()而不是time.sleep(),以防您需要在操作的执行中添加延迟。查看此详细答案以获取更多详细信息和示例。

\n

第三,您正在使用requests\'函数,该函数一次一行地iter_lines()迭代响应数据。但是,如果响应数据不包含任何换行符(即),您将不会看到客户端控制台上的数据在到达时被打印,直到客户端收到并打印整个响应。作为一个整体。在这种情况下,您应该根据需要使用并指定(这两种情况都在下面的示例中进行了演示)。\\niter_content()chunk_size

\n

最后,如果您希望StreamingResponse在每个浏览器(也包括 Chrome)\xe2\x80\x94 中工作,即能够在数据流入 \xe2\x80\x94 时查看数据,则应将 指定为media_type不同的类型比text/plain(例如,application/jsontext/event-stream,请参阅此处),或禁用MIME 嗅探。如此处所述浏览器将开始缓冲text/plain一定数量的响应(大约 1445 字节,如此处所述,以检查接收到的内容是否实际上是纯文本。为了避免这种情况,您可以将 设为media_typetext/event-stream用于服务器发送的事件),或者继续使用text/plain,但将X-Content-Type-Options响应标头设为nosniff,这将禁用 MIME 嗅探(下面的示例中演示了这两个选项)。

\n

工作示例

\n

应用程序.py

\n
from fastapi import FastAPI\nfrom fastapi.responses import StreamingResponse\nimport asyncio\n\n\napp = FastAPI()\n\n\nasync def fake_data_streamer():\n    for i in range(10):\n        yield b\'some fake data\\n\\n\'\n        await asyncio.sleep(0.5)\n\n\n# If your generator contains blocking operations such as time.sleep(),\n# then define the generator function with normal `def`. Otherwise, use `async def`, \n# but run any blocking operations in an external ThreadPool, etc. (see 2nd paragraph of this answer)\n\'\'\'\nimport time\n\ndef fake_data_streamer():\n    for i in range(10):\n        yield b\'some fake data\\n\\n\'\n        time.sleep(0.5)\n\'\'\'        \n\n    \n@app.get(\'/\')\nasync def main():\n    return StreamingResponse(fake_data_streamer(), media_type=\'text/event-stream\')\n    # or, use:\n    \'\'\'\n    headers = {\'X-Content-Type-Options\': \'nosniff\'}\n    return StreamingResponse(fake_data_streamer(), headers=headers, media_type=\'text/plain\')\n    \'\'\'\n
Run Code Online (Sandbox Code Playgroud)\n

test.py(使用Python requests

\n
import requests\n\nurl = "http://localhost:8000/"\n\nwith requests.get(url, stream=True) as r:\n    for chunk in r.iter_content(1024):  # or, for line in r.iter_lines():\n        print(chunk)\n
Run Code Online (Sandbox Code Playgroud)\n

test.py(使用httpx\xe2\x80\x94see this,以及thisthishttpx以获得使用over的好处requests

\n
import httpx\n\nurl = \'http://127.0.0.1:8000/\'\n\nwith httpx.stream(\'GET\', url) as r:\n    for chunk in r.iter_raw():  # or, for line in r.iter_lines():\n        print(chunk)\n
Run Code Online (Sandbox Code Playgroud)\n

  • 感谢您非常全面的回答。任何稍后查看此内容的人都应该遵循克里斯给出的建议。另外,我应该注意到我的原始代码实际上有效(尽管存在一些问题,正如克里斯所描述的)。iter_lines 和我的测试文件存在一些问题,但我已经解决了这个问题。根本问题源于我用来托管此应用程序的无服务器提供商的问题。 (2认同)

归档时间:

查看次数:

29757 次

最近记录:

1 年,9 月 前