我们正在使用 Python FastAPI 编写一个 Web 服务,该服务将托管在 Kubernetes 中。出于审计目的,我们需要保存特定路由的request/的原始 JSON 正文。JSON的主体大小约为1MB,最好这不应该影响响应时间。我们怎样才能做到这一点?responserequestresponse
我想app在我的路由器文件中获取该实例,我该怎么办?
我main.py的如下:
# ...
app = FastAPI()
app.machine_learning_model = joblib.load(some_path)
app.include_router(some_router)
# ...
Run Code Online (Sandbox Code Playgroud)
现在我想在some_router的文件中使用app.machine_learning_model,我该怎么办?
HTTP如何在 FastAPI 的请求之间共享变量值?例如,我有一个POST请求,其中我获取一些音频文件,然后将它们的信息转换为 Pandas Dataframe。我想在请求Dataframe中发送它GET,但我无法访问Dataframe请求GET范围。
@app.post(
path="/upload-audios/",
status_code=status.HTTP_200_OK
)
async def upload_audios(audios: list[UploadFile] = File(...)):
filenames = [audio.filename for audio in audios]
audio_data = [audio.file for audio in audios]
new_data = []
final_data = []
header = ["name", "file"]
for i in range(len(audios)):
new_data = [filenames[i], audio_data[i]]
final_data.append(new_data)
new_df = pd.DataFrame(final_data, columns=header)
return f"You have uploaded {len(audios)} audios which names are: {filenames}"
@app.get("/get-dataframe/")
async def get_dataframe():
pass
Run Code Online (Sandbox Code Playgroud) 快速API 0.68.0
Python 3.8
from fastapi import Depends, FastAPI, Header, HTTPException
async def verify_key(x_key: str = Header(...)):
if x_key != "fake-super-secret-key":
raise HTTPException(status_code=400, detail="X-Key header invalid")
return x_key
app = FastAPI(dependencies=[Depends(verify_key)])
@app.get("/items/")
async def read_items():
return [{"item": "Portal Gun"}, {"item": "Plumbus"}]
Run Code Online (Sandbox Code Playgroud)
这是来自FastAPI文档的示例(省略部分代码)
有什么办法可以进去x_key吗read_items()
我正在构建一个 FastAPI 服务器来接收 slacklash 命令发送的请求。使用下面的代码,我可以看到以下内容:
token=BLAHBLAH&team_id=BLAHBLAH&team_domain=myteam&channel_id=BLAHBLAH&channel_name=testme&user_id=BLAH&user_name=myname&command=%2Fwhatever&text=test&api_app_id=BLAHBLAH&is_enterprise_install=false&response_url=https%3A%2F%2Fhooks.slack.com%2Fcommands%BLAHBLAH&trigger_id=BLAHBLAHBLAH
Run Code Online (Sandbox Code Playgroud)
被打印出来,这正是我在官方文档中看到的有效负载。我正在尝试使用有效负载信息来做某事,我很好奇是否有一种解析此有效负载信息的好方法。我绝对可以使用 split 函数或任何其他漂亮的函数来解析这个有效负载,但我很好奇是否有一种“事实上的”方法来处理松弛有效负载。提前致谢!
from fastapi import FastAPI, Request
app = FastAPI()
@app.post("/")
async def root(request: Request):
request_body = await request.body()
print(request_body)
Run Code Online (Sandbox Code Playgroud)