Dol*_*hin 3 streaming python-3.x server-sent-events fastapi openai-api
我正在使用 Python 3.10 和 FastAPI0.92.0编写服务器发送事件 (SSE) 流 api。Python 代码如下所示:
from fastapi import APIRouter, FastAPI, Header
from src.chat.completions import chat_stream
from fastapi.responses import StreamingResponse
router = APIRouter()
@router.get("/v1/completions",response_class=StreamingResponse)
def stream_chat(q: str, authorization: str = Header(None)):
auth_mode, auth_token = authorization.split(' ')
if auth_token is None:
return "Authorization token is missing"
answer = chat_stream(q, auth_token)
return StreamingResponse(answer, media_type="text/event-stream")
Run Code Online (Sandbox Code Playgroud)
这是chat_stream函数:
import openai
def chat_stream(question: str, key: str):
openai.api_key = key
# create a completion
completion = openai.Completion.create(model="text-davinci-003",
prompt=question,
stream=True)
return completion
Run Code Online (Sandbox Code Playgroud)
当我使用此命令调用 api 时:
curl -N -H "Authorization: Bearer sk-the openai key" https://chat.poemhub.top/v1/completions?q=hello
Run Code Online (Sandbox Code Playgroud)
服务器端显示以下错误:
INFO: 123.146.17.54:0 - "GET /v1/completions?q=hello HTTP/1.0" 200 OK
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/uvicorn/protocols/http/h11_impl.py", line 429, in run_asgi
result = await app( # type: ignore[func-returns-value]
File "/usr/local/lib/python3.10/dist-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
return await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/fastapi/applications.py", line 276, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/applications.py", line 122, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "/usr/local/lib/python3.10/dist-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/usr/local/lib/python3.10/dist-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 718, in __call__
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 276, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 69, in app
await response(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 270, in __call__
async with anyio.create_task_group() as task_group:
File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 273, in wrap
await func()
File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 264, in stream_response
chunk = chunk.encode(self.charset)
File "/usr/local/lib/python3.10/dist-packages/openai/openai_object.py", line 61, in __getattr__
raise AttributeError(*err.args)
AttributeError: encode
Run Code Online (Sandbox Code Playgroud)
为什么会发生这个错误?我应该做什么来修复它?
如FastAPI 文档中所述,StreamingResponse采用异步 ( async def) 生成器或普通 ( def) 生成器/迭代器并流式传输响应正文。正如这个答案中所解释的,在任何一种情况下,FastAPI 仍将异步工作\xe2\x80\x94如果传递给的生成器StreamingResponse不是异步的,FastAPI/Starlette 将在单独的线程中运行生成器(请参阅相关实现这里),使用iterate_in_threadpool(),然后将被awaited (参见iterate_in_threadpool()实现)。def有关 FastAPI 中vs的更多详细信息async def,以及在async def端点内运行阻塞操作的解决方案,请查看此详细答案。
StreamingResponse是 的子类Response,将响应正文流式传输到 中bytes。因此,如果通过生成器传递的内容不符合bytes格式,FastAPI/Starlette 将尝试将其编码/转换为bytes(使用默认utf-8编码方案)。下面是相关实现的代码片段:
async for chunk in self.body_iterator:\n if not isinstance(chunk, bytes):\n chunk = chunk.encode(self.charset)\n await send({"type": "http.response.body", "body": chunk, "more_body": True})\nRun Code Online (Sandbox Code Playgroud)\n但是,如果chunk迭代器/生成器中的 a 不是str可以编码的格式,AttributeError则会引发 an (例如AttributeError: ... has no attribute \'encode\'),类似于此答案中描述的内容(请参阅选项 2,注释 3)。另外,如果一个块包含编码范围之外的字符utf-8,您可能会遇到UnicodeEncodeError: ... codec can\'t encode character. 因此,考虑到AttributeError: encode您提供的错误回溯中的 ,您很可能传递的是 以外的对象str。
下面的示例演示了一个异步生成器(即async def gen()),流式传输 JSON 数据,在本例中,需要dict首先将对象转换为str(使用json.dumps(),或任何其他 JSON 编码器,例如orjson,请参阅此处),然后(可选)相应的字节值(使用.encode(\'utf-8\')),如前所述,如果省略,FastAPI/Starlette 将处理该问题。此外,下面的示例使用text/event-stream媒体类型(也称为MIME 类型),通常在从服务器发送事件时使用(另请参阅事件流格式)。如果您确信从服务器发送的数据始终为 JSON 格式,则application/json也可以使用媒体类型。请注意,正如此答案中所解释的,使用text/plain媒体类型可能会导致无法立即在浏览器中显示流数据,因为浏览器使用缓冲text/plain所谓的“MIME嗅探”的响应(请参阅上面的链接答案,了解如何禁用如果您想使用的话text/plain)。
from fastapi import FastAPI\nfrom fastapi.responses import StreamingResponse\nimport asyncio\nimport json\n\napp = FastAPI()\n\n\n@app.get(\'/\')\nasync def main():\n async def gen():\n while True:\n #yield (json.dumps({\'msg\': \'Hello World!\'}) + \'\\n\\n\').encode(\'utf-8\')\n # or, simply use the below, and FastAPI/Starlette will take care of the encoding\n yield json.dumps({\'msg\': \'Hello World!\'}) + \'\\n\\n\'\n await asyncio.sleep(0.5)\n\n return StreamingResponse(gen(), media_type=\'text/event-stream\')\nRun Code Online (Sandbox Code Playgroud)\n
\n如果您确信从服务器发送的数据是JSON格式,您可以使用:
# ...\n\n@app.get(\'/\')\nasync def main():\n async def gen():\n while True:\n yield json.dumps({\'msg\': \'Hello World!\'})\n await asyncio.sleep(0.5)\n\n return StreamingResponse(gen(), media_type=\'application/json\')\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
4184 次 |
| 最近记录: |