在 FastAPI 中返回 StreamingResponse 时出现“AttributeError:encode”

Dol*_*hin 3 streaming python-3.x server-sent-events fastapi openai-api

我正在使用 Python 3.10 和 FastAPI0.92.0编写服务器发送事件 (SSE) 流 api。Python 代码如下所示:

from fastapi import APIRouter, FastAPI, Header

from src.chat.completions import chat_stream
from fastapi.responses import StreamingResponse

router = APIRouter()

@router.get("/v1/completions",response_class=StreamingResponse)
def stream_chat(q: str, authorization: str = Header(None)):
    auth_mode, auth_token = authorization.split(' ')
    if auth_token is None:
        return "Authorization token is missing"
    answer = chat_stream(q, auth_token)
    return StreamingResponse(answer, media_type="text/event-stream")
Run Code Online (Sandbox Code Playgroud)

这是chat_stream函数:

import openai

def chat_stream(question: str, key: str):
    openai.api_key = key
    # create a completion
    completion = openai.Completion.create(model="text-davinci-003",
                                          prompt=question,
                                          stream=True)
    return completion
Run Code Online (Sandbox Code Playgroud)

当我使用此命令调用 api 时:

curl -N -H "Authorization: Bearer sk-the openai key" https://chat.poemhub.top/v1/completions?q=hello
Run Code Online (Sandbox Code Playgroud)

服务器端显示以下错误:

INFO:     123.146.17.54:0 - "GET /v1/completions?q=hello HTTP/1.0" 200 OK
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/uvicorn/protocols/http/h11_impl.py", line 429, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "/usr/local/lib/python3.10/dist-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/fastapi/applications.py", line 276, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/applications.py", line 122, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.10/dist-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/usr/local/lib/python3.10/dist-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 69, in app
    await response(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 270, in __call__
    async with anyio.create_task_group() as task_group:
  File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 273, in wrap
    await func()
  File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 264, in stream_response
    chunk = chunk.encode(self.charset)
  File "/usr/local/lib/python3.10/dist-packages/openai/openai_object.py", line 61, in __getattr__
    raise AttributeError(*err.args)
AttributeError: encode
Run Code Online (Sandbox Code Playgroud)

为什么会发生这个错误?我应该做什么来修复它?

Chr*_*ris 7

如FastAPI 文档中所述,StreamingResponse采用异步 ( async def) 生成器或普通 ( def) 生成器/迭代器并流式传输响应正文。正如这个答案中所解释的,在任何一种情况下,FastAPI 仍将异步工作\xe2\x80\x94如果传递给的生成器StreamingResponse不是异步的,FastAPI/Starlette 将在单独的线程中运行生成器(请参阅相关实现这里),使用iterate_in_threadpool(),然后将被awaited (参见iterate_in_threadpool()实现)。def有关 FastAPI 中vs的更多详细信息async def,以及在async def端点内运行阻塞操作的解决方案,请查看此详细答案

\n

StreamingResponse是 的子类Response,将响应正文流式传输到 中bytes。因此,如果通过生成器传递的内容不符合bytes格式,FastAPI/Starlette 将尝试将其编码/转换为bytes(使用默认utf-8编码方案)。下面是相关实现的代码片段:

\n
async for chunk in self.body_iterator:\n    if not isinstance(chunk, bytes):\n        chunk = chunk.encode(self.charset)\n    await send({"type": "http.response.body", "body": chunk, "more_body": True})\n
Run Code Online (Sandbox Code Playgroud)\n

但是,如果chunk迭代器/生成器中的 a 不是str可以编码的格式,AttributeError则会引发 an (例如AttributeError: ... has no attribute \'encode\'),类似于此答案中描述的内容(请参阅选项 2,注释 3)。另外,如果一个块包含编码范围之外的字符utf-8,您可能会遇到UnicodeEncodeError: ... codec can\'t encode character. 因此,考虑到AttributeError: encode您提供的错误回溯中的 ,您很可能传递的是 以外的对象str

\n

下面的示例演示了一个异步生成器(即async def gen()),流式传输 JSON 数据,在本例中,需要dict首先将对象转换为str(使用json.dumps(),或任何其他 JSON 编码器,例如orjson,请参阅此处),然后(可选)相应的字节值(使用.encode(\'utf-8\')),如前所述,如果省略,FastAPI/Starlette 将处理该问题。此外,下面的示例使用text/event-stream媒体类型(也称为MIME 类型),通常在从服务器发送事件时使用(另请参阅事件流格式)。如果您确信从服务器发送的数据始终为 JSON 格式,则application/json也可以使用媒体类型。请注意,正如此答案中所解释的,使用text/plain媒体类型可能会导致无法立即在浏览器中显示流数据,因为浏览器使用缓冲text/plain所谓的“MIME嗅探”的响应(请参阅上面的链接答案,了解如何禁用如果您想使用的话text/plain)。

\n

工作示例

\n
from fastapi import FastAPI\nfrom fastapi.responses import StreamingResponse\nimport asyncio\nimport json\n\napp = FastAPI()\n\n\n@app.get(\'/\')\nasync def main():\n    async def gen():\n        while True:\n            #yield (json.dumps({\'msg\': \'Hello World!\'}) + \'\\n\\n\').encode(\'utf-8\')\n            # or, simply use the below, and FastAPI/Starlette will take care of the encoding\n            yield json.dumps({\'msg\': \'Hello World!\'}) + \'\\n\\n\'\n            await asyncio.sleep(0.5)\n\n    return StreamingResponse(gen(), media_type=\'text/event-stream\')\n
Run Code Online (Sandbox Code Playgroud)\n


\n如果您确信从服务器发送的数据是JSON格式,您可以使用:

\n
# ...\n\n@app.get(\'/\')\nasync def main():\n    async def gen():\n        while True:\n            yield json.dumps({\'msg\': \'Hello World!\'})\n            await asyncio.sleep(0.5)\n\n    return StreamingResponse(gen(), media_type=\'application/json\')\n
Run Code Online (Sandbox Code Playgroud)\n