und*_*ned 3 python fastapi httpx
我正在尝试使用 python Fast API 框架代理外部网站(在不同容器上运行的 Flower 监控 URL):
\nclient = AsyncClient(base_url=f'http://containername:7800/monitor')\n\n@app.get(\xe2\x80\x9c/monitor/{path:path}\xe2\x80\x9d)\nasync def tile_request(path: str):\n req = client.build_request("GET", path)\n r = await client.send(req, stream=True)\n return StreamingResponse(\n r.aiter_raw(),\n background=BackgroundTask(r.aclose),\n headers=r.headers\n )\nRun Code Online (Sandbox Code Playgroud)\n它能够代理每个路径的容器 URL。对于前。
\nhttp://python_server:8001/monitor/dashboard --> http://containername:7800/monitor/dashboard
\nhttp://python_server:8001/monitor/tasks --> http://containername:7800/monitor/tasks
\n效果很好。但是当 PATH 的 URL 中有一些查询参数时,它会失败。
\n对于前。
\nhttp://python_server:8001/monitor/dashboard?json=1&_=1641485992460 --> redirects to http://containername:7800/monitor/dashboard \nRun Code Online (Sandbox Code Playgroud)\n(请注意,URL 中不会附加任何查询参数)。
\n任何人都可以帮助我们如何使用任何查询参数代理此外部网站的任何路径。
\n该代码适用于我并在生产中使用:
import httpx
from httpx import AsyncClient
from fastapi import Request
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask
app = FastAPI()
HTTP_SERVER = AsyncClient(base_url="http://localhost:8000/")
async def _reverse_proxy(request: Request):
url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
rp_req = HTTP_SERVER.build_request(
request.method, url, headers=request.headers.raw, content=await request.body()
)
rp_resp = await HTTP_SERVER.send(rp_req, stream=True)
return StreamingResponse(
rp_resp.aiter_raw(),
status_code=rp_resp.status_code,
headers=rp_resp.headers,
background=BackgroundTask(rp_resp.aclose),
)
app.add_route("/{path:path}", _reverse_proxy, ["GET", "POST"])
Run Code Online (Sandbox Code Playgroud)