576*_*76i 21 python starlette fastapi
在同步而非非模式下使用 FastAPI async,我希望能够接收 POST 请求的原始、未更改的正文。
我能找到的所有示例都显示async代码,当我以正常同步方式尝试时,它们request.body()显示为协程对象。
XML当我通过向此端点发布一些内容来测试它时,我得到了一个500 "Internal Server Error".
from fastapi import FastAPI, Response, Request, Body
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.post("/input")
def input_request(request: Request):
# how can I access the RAW request body here?
body = request.body()
# do stuff with the body here
return Response(content=body, media_type="application/xml")
Run Code Online (Sandbox Code Playgroud)
这对于 FastAPI 来说是不可能的吗?
注意:简化的输入请求如下所示:
POST http://127.0.0.1:1083/input
Content-Type: application/xml
<XML>
<BODY>TEST</BODY>
</XML>
Run Code Online (Sandbox Code Playgroud)
我无法控制输入请求的发送方式,因为我需要替换现有的 SOAP API。
Chr*_*ris 23
async def端点如果一个对象是一个协程,则需要等待它。FastAPI其实底层就是Starlette, Starlette返回请求的方法body就是方法(也可以参见这里的async源码);因此,人们需要它们(在端点内)。例如:awaitasync def
from fastapi import Request\n\n@app.post("/input")\nasync def input_request(request: Request):\n return await request.body()\nRun Code Online (Sandbox Code Playgroud)\ndef端点或者,如果您确信传入数据是有效的JSON,则可以改为定义端点def,并使用该Body字段,如下所示(有关如何发布JSON数据的更多选项,请参阅此答案):
from fastapi import Body\n\n@app.post("/input")\ndef input_request(payload: dict = Body(...)):\n return payload\nRun Code Online (Sandbox Code Playgroud)\n但是,如果传入数据采用XML格式(如您提供的示例所示),则一种选择是使用以下方式传递它们Files,如下所示\xe2\x80\x94,只要您可以控制客户端数据发送到服务器的方式(也可以看看这里)。例子:
from fastapi import File\n\n@app.post("/input") \ndef input_request(contents: bytes = File(...)): \n return contents\nRun Code Online (Sandbox Code Playgroud)\ndef端点和async依赖项如本文所述,您可以使用async 依赖body函数从请求中提取。您也可以使用async对non-async(即)端点的依赖关系。def因此,如果此端点中存在某种阻止您使用async/await \xe2\x80\x94 的代码,我猜这可能是您的情况的原因 \xe2\x80\x94 这就是要走的路。
注意:我还应该提到这个答案def\xe2\x80\x94 它解释了和端点之间的区别async def(您可能知道)\xe2\x80\x94 在您需要使用时也提供了解决方案async def(正如您可能需要的那样await)路由内的协程),但也有一些同步的、昂贵的 CPU 密集型操作,可能会阻塞服务器。请看一看。
下面可以找到前面描述的方法的示例。如果您time.sleep()想确认自己的请求不会阻止其他请求通过,则可以取消注释该行,因为当您使用正常而不是声明端点def时async def,它在外部线程池中运行(无论async def依赖函数)。
from fastapi import FastAPI, Depends, Request\nimport time\n\napp = FastAPI()\n\nasync def get_body(request: Request):\n return await request.body()\n\n@app.post("/input")\ndef input_request(body: bytes = Depends(get_body)):\n print("New request arrived.")\n #time.sleep(5)\n return body\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
18824 次 |
| 最近记录: |