Scu*_*ris 10 python asynchronous fastapi
我正在尝试建立一个 FastAPI 服务器,它将接受一些生物数据作为输入,并对它们进行一些处理。由于处理会占用服务器的所有资源,因此应按顺序处理查询。但是,服务器应该保持响应并在缓冲区中添加更多请求。我一直在尝试使用 BackgroundTasks 模块来实现此目的,但是在发送第二个查询后,任务运行时响应会延迟。感谢任何帮助,并提前致谢。
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}
app = FastAPI()
@dataclass
class Query():
query_name: str
query_sequence: str
experiment_id: str = None
status: str = "pending"
def __post_init__(self):
self.experiment_id = str(time.time())
self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
os.makedirs(self.experiment_dir, exist_ok=False)
def run(self):
self.status = "running"
# perform some long task using the query sequence and get a return code #
self.status = "finished"
return 0 # or another code depending on the final output
@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
query_data = await request.body()
query_data = query_data.decode("utf-8")
query_data = dict(str(x).split("=") for x in query_data.split("&"))
query = Query(**query_data)
QUERY_BUFFER[query.experiment_id] = query
background_tasks.add_task(process, query)
return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}
async def process(query):
""" Process query and generate data"""
ret_code = await query.run()
del QUERY_BUFFER[query.experiment_id]
print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')
@app.get("/backlog/")
def return_backlog():
return {f"Currently {len(QUERY_BUFFER)} jobs in the backlog."}
Run Code Online (Sandbox Code Playgroud)
最初的答案受到测试的影响httpx.AsyncClient(因为最初的警告中可能会出现这种情况)。测试客户端会导致后台任务阻塞,而没有测试客户端则不会阻塞。因此,如果您不想使用 来测试它,则有一个更简单的解决方案httpx.AsyncClient。新的解决方案使用 uvicorn,然后我用 Postman 手动测试。
该解决方案使用一个函数作为后台任务(process),以便它在主线程之外运行。然后,它会安排一个作业来运行aprocess,当事件循环有机会时,该作业将在主线程中运行。然后,协aprocess程能够run像以前一样等待查询的协程。
此外,我time.sleep(10)在process函数中添加了 a 来说明即使长时间运行的非 IO 任务也不会阻止您的原始 HTTP 会话将响应发送回客户端(尽管这仅在释放 GIL 的情况下才有效)。如果它受 CPU 限制,那么您可能需要通过使用多处理或单独的服务来完全单独的进程)。最后,我用日志记录替换了打印,以便它们与 uvicorn 日志记录一起工作。
import asyncio
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
import logging
logging.basicConfig(level=logging.INFO, format="%(levelname)-9s %(asctime)s - %(name)s - %(message)s")
LOGGER = logging.getLogger(__name__)
EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}
app = FastAPI()
loop = asyncio.get_event_loop()
@dataclass
class Query():
query_name: str
query_sequence: str
experiment_id: str = None
status: str = "pending"
def __post_init__(self):
self.experiment_id = str(time.time())
self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
# os.makedirs(self.experiment_dir, exist_ok=False) # Commented out for testing
async def run(self):
self.status = "running"
await asyncio.sleep(5) # simulate long running query
# perform some long task using the query sequence and get a return code #
self.status = "finished"
return 0 # or another code depending on the final output
@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
query_data = await request.body()
query_data = query_data.decode("utf-8")
query_data = dict(str(x).split("=") for x in query_data.split("&"))
query = Query(**query_data)
QUERY_BUFFER[query.experiment_id] = query
background_tasks.add_task(process, query)
LOGGER.info(f'root - added task')
return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}
def process(query):
""" Schedule processing of query, and then run some long running non-IO job without blocking the app"""
asyncio.run_coroutine_threadsafe(aprocess(query), loop)
LOGGER.info(f"process - {query.experiment_id} - Submitted query job. Now run non-IO work for 10 seconds...")
time.sleep(10) # simulate long running non-IO work, does not block app as this is in another thread - provided it is not cpu bound.
LOGGER.info(f'process - {query.experiment_id} - wake up!')
async def aprocess(query):
""" Process query and generate data """
ret_code = await query.run()
del QUERY_BUFFER[query.experiment_id]
LOGGER.info(f'aprocess - Query {query.experiment_id} processing finished with return code {ret_code}.')
@app.get("/backlog/")
def return_backlog():
return {f"return_backlog - Currently {len(QUERY_BUFFER)} jobs in the backlog."}
if __name__ == "__main__":
import uvicorn
uvicorn.run("scratch_26:app", host="127.0.0.1", port=8000)
Run Code Online (Sandbox Code Playgroud)
据我所知(并且我非常乐于纠正这一点),BackgroundTasks实际上需要在发送 HTTP 响应之前完成。这不是 Starlette 文档或 FastAPI 文档所说的,但似乎是这样,至少在使用 httpx AsyncClient 时是这样。
无论您添加协程(在主线程中执行)还是函数(在其自己的侧线程中执行),HTTP 响应都会被阻止发送,直到后台任务完成。
如果您想等待长时间运行(异步友好)的任务,您可以使用包装函数来解决此问题。包装函数将实际任务(一个协程,因为它将使用await)添加到事件循环中,然后返回。由于速度非常快,因此它“阻塞”的事实不再重要(假设几毫秒并不重要)。
然后,真正的任务依次执行(但在发送初始 HTTP 响应之后),虽然它在主线程上,但函数的 asyncio 部分不会阻塞。
你可以试试这个:
@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
...
background_tasks.add_task(process_wrapper, query)
...
async def process_wrapper(query):
loop = asyncio.get_event_loop()
loop.create_task(process(query))
async def process(query):
""" Process query and generate data"""
ret_code = await query.run()
del QUERY_BUFFER[query.experiment_id]
print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')
Run Code Online (Sandbox Code Playgroud)
另请注意,您还需要run()通过添加关键字来使您的函数成为协程async,因为您希望从函数中等待它process()。
这是一个用于httpx.AsyncClient测试它的完整工作示例。为了说明目的,我添加了fmt_duration辅助函数来显示经过的时间。我还注释掉了创建目录的代码,并在函数中模拟了 2 秒的查询持续时间run()。
import asyncio
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
from httpx import AsyncClient
EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}
app = FastAPI()
start_ts = time.time()
@dataclass
class Query():
query_name: str
query_sequence: str
experiment_id: str = None
status: str = "pending"
def __post_init__(self):
self.experiment_id = str(time.time())
self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
# os.makedirs(self.experiment_dir, exist_ok=False) # Commented out for testing
async def run(self):
self.status = "running"
await asyncio.sleep(2) # simulate long running query
# perform some long task using the query sequence and get a return code #
self.status = "finished"
return 0 # or another code depending on the final output
@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
query_data = await request.body()
query_data = query_data.decode("utf-8")
query_data = dict(str(x).split("=") for x in query_data.split("&"))
query = Query(**query_data)
QUERY_BUFFER[query.experiment_id] = query
background_tasks.add_task(process_wrapper, query)
print(f'{fmt_duration()} - root - added task')
return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}
async def process_wrapper(query):
loop = asyncio.get_event_loop()
loop.create_task(process(query))
async def process(query):
""" Process query and generate data"""
ret_code = await query.run()
del QUERY_BUFFER[query.experiment_id]
print(f'{fmt_duration()} - process - Query {query.experiment_id} processing finished with return code {ret_code}.')
@app.get("/backlog/")
def return_backlog():
return {f"{fmt_duration()} - return_backlog - Currently {len(QUERY_BUFFER)} jobs in the backlog."}
async def test_me():
async with AsyncClient(app=app, base_url="http://example") as ac:
res = await ac.post("/", content="query_name=foo&query_sequence=42")
print(f"{fmt_duration()} - [{res.status_code}] - {res.content.decode('utf8')}")
res = await ac.post("/", content="query_name=bar&query_sequence=43")
print(f"{fmt_duration()} - [{res.status_code}] - {res.content.decode('utf8')}")
content = ""
while not content.endswith('0 jobs in the backlog."]'):
await asyncio.sleep(1)
backlog_results = await ac.get("/backlog")
content = backlog_results.content.decode("utf8")
print(f"{fmt_duration()} - test_me - content: {content}")
def fmt_duration():
return f"Progress time: {time.time() - start_ts:.3f}s"
loop = asyncio.get_event_loop()
print(f'starting loop...')
loop.run_until_complete(test_me())
duration = time.time() - start_ts
print(f'Finished. Duration: {duration:.3f} seconds.')
Run Code Online (Sandbox Code Playgroud)
在我的本地环境中,如果我运行上面的命令,我会得到以下输出:
starting loop...
Progress time: 0.005s - root - added task
Progress time: 0.006s - [200] - {"Query created":{"query_name":"foo","query_sequence":"42","experiment_id":"1627489235.9300923","status":"pending","experiment_dir":"/experiments/1627489235.9300923"},"Query ID":"1627489235.9300923","Backlog Length":1}
Progress time: 0.007s - root - added task
Progress time: 0.009s - [200] - {"Query created":{"query_name":"bar","query_sequence":"43","experiment_id":"1627489235.932097","status":"pending","experiment_dir":"/experiments/1627489235.932097"},"Query ID":"1627489235.932097","Backlog Length":2}
Progress time: 1.016s - test_me - content: ["Progress time: 1.015s - return_backlog - Currently 2 jobs in the backlog."]
Progress time: 2.008s - process - Query 1627489235.9300923 processing finished with return code 0.
Progress time: 2.008s - process - Query 1627489235.932097 processing finished with return code 0.
Progress time: 2.041s - test_me - content: ["Progress time: 2.041s - return_backlog - Currently 0 jobs in the backlog."]
Finished. Duration: 2.041 seconds.
Run Code Online (Sandbox Code Playgroud)
我还尝试创建process_wrapper一个函数,以便 Starlette 在新线程中执行它。这工作方式相同,只是使用run_coroutine_threadsafe而不是create_taskie
def process_wrapper(query):
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(process(query), loop)
Run Code Online (Sandbox Code Playgroud)
如果有其他方法可以让后台任务在不阻塞 HTTP 响应的情况下运行,我很想知道如何实现,但缺少此包装解决方案应该可以工作。
| 归档时间: |
|
| 查看次数: |
8234 次 |
| 最近记录: |