相关疑难解决方法(0)

如何从我的 FastAPI 应用程序向另一个站点 (API) 发送 HTTP 请求?

我正在尝试http://httpbin.org/uuid使用以下代码片段一次向服务器发送 100 个请求

from fastapi import FastAPI
from time import sleep
from time import time
import requests
import asyncio

app = FastAPI()

URL= "http://httpbin.org/uuid"


# @app.get("/")
async def main():
    r = requests.get(URL)
    # print(r.text)
    
    return r.text

async def task():
    tasks = [main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main(),main()]
    # print(tasks)
    # input("stop")
    result = await asyncio.gather(*tasks)
    print (result)

@app.get('/')
def f():
    start = time()
    asyncio.run(task())
    print("time: ",time()-start)
Run Code Online (Sandbox Code Playgroud)

我将 FastAPI 与 Asyncio 结合使用,以实现大约 3 秒或更短的最短时间,但使用上述方法我得到的总时间为 66 秒,超过一分钟。我还想保留main用于附加操作的功能r.text。我知道要实现如此短的时间,需要并发性,但我不确定我在这里犯了什么错误。

python httprequest async-await python-asyncio fastapi

19
推荐指数
1
解决办法
7728
查看次数

如何从fastapi中的另一个api调用一个api?

我能够从另一个 API 获得一个 API 的响应,但无法将其存储在某处(在返回响应之前的文件或其他内容中) response=RedirectResponse(url="/apiname/")(我想访问带有标头和正文的发布请求)

我想存储这个响应内容而不返回它。

是的,如果我返回函数,我会得到结果,但是当我打印它时,我找不到结果。另外,如果我给出发布请求,那么我会收到错误实体未找到。

我阅读了 starlette 和 fastapi 文档,但找不到解决方法。回调也没有帮助。

python callback python-3.x starlette fastapi

5
推荐指数
2
解决办法
7282
查看次数

Python:我如何同时发送多个http请求?(像叉子)

假设我有办法将http请求发送到服务器.如何将这些请求中的两个(或更多)同时发送到服务器?例如,可能通过fork进程?我该怎么做?(我也在使用django)

#This example is not tested...
import requests

def tester(request):
    server_url = 'http://localhost:9000/receive'

    payload = {
        'd_test2': '1234',
        'd_test2': 'demo',
        }
    json_payload = simplejson.dumps(payload)
    content_length = len(json_payload)

    headers = {'Content-Type': 'application/json', 'Content-Length': content_length}
    response = requests.post(server_url, data=json_payload, headers=headers, allow_redirects=True)

    if response.status_code == requests.codes.ok:
        print 'Headers: {}\nResponse: {}'.format(response.headers, response.text)
Run Code Online (Sandbox Code Playgroud)

谢谢!

python django fork process httprequest

3
推荐指数
1
解决办法
7054
查看次数