FastAPI,将长任务添加到缓冲区并一一处理,同时保持服务器响应能力

Scu*_*ris 10 python asynchronous fastapi

我正在尝试建立一个 FastAPI 服务器,它将接受一些生物数据作为输入,并对它们进行一些处理。由于处理会占用服务器的所有资源,因此应按顺序处理查询。但是,服务器应该保持响应并在缓冲区中添加更多请求。我一直在尝试使用 BackgroundTasks 模块来实现此目的,但是在发送第二个查询后,任务运行时响应会延迟。感谢任何帮助,并提前致谢。

import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks

EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}

app = FastAPI()

@dataclass
class Query():
    query_name: str
    query_sequence: str
    experiment_id: str = None
    status: str = "pending"

    def __post_init__(self):
        self.experiment_id = str(time.time())
        self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
        os.makedirs(self.experiment_dir, exist_ok=False)

    def run(self):
        self.status = "running"
        # perform some long task using the query sequence and get a return code #
        self.status = "finished"
        return 0 # or another code depending on the final output

@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
    query_data = await request.body()
    query_data = query_data.decode("utf-8")
    query_data = dict(str(x).split("=") for x in query_data.split("&"))
    query = Query(**query_data)
    QUERY_BUFFER[query.experiment_id] = query
    background_tasks.add_task(process, query)
    return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}

async def process(query):
    """ Process query and generate data"""
    ret_code = await query.run()
    del QUERY_BUFFER[query.experiment_id]
    print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')

@app.get("/backlog/")
def return_backlog():
    return {f"Currently {len(QUERY_BUFFER)} jobs in the backlog."}
Run Code Online (Sandbox Code Playgroud)

Ben*_*Ben 6

编辑:

最初的答案受到测试的影响httpx.AsyncClient(因为最初的警告中可能会出现这种情况)。测试客户端会导致后台任务阻塞,而没有测试客户端则不会阻塞。因此,如果您不想使用 来测试它,则有一个更简单的解决方案httpx.AsyncClient。新的解决方案使用 uvicorn,然后我用 Postman 手动测试。

该解决方案使用一个函数作为后台任务(process),以便它在主线程之外运行。然后,它会安排一个作业来运行aprocess,当事件循环有机会时,该作业将在主线程中运行。然后,协aprocess程能够run像以前一样等待查询的协程。

此外,我time.sleep(10)process函数中添加了 a 来说明即使长时间运行的非 IO 任务也不会阻止您的原始 HTTP 会话将响应发送回客户端(尽管这仅在释放 GIL 的情况下才有效)。如果它受 CPU 限制,那么您可能需要通过使用多处理或单独的服务来完全单独的进程)。最后,我用日志记录替换了打印,以便它们与 uvicorn 日志记录一起工作。

import asyncio
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
import logging


logging.basicConfig(level=logging.INFO, format="%(levelname)-9s %(asctime)s - %(name)s - %(message)s")
LOGGER = logging.getLogger(__name__)

EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}

app = FastAPI()
loop = asyncio.get_event_loop()

@dataclass
class Query():
    query_name: str
    query_sequence: str
    experiment_id: str = None
    status: str = "pending"

    def __post_init__(self):
        self.experiment_id = str(time.time())
        self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
        # os.makedirs(self.experiment_dir, exist_ok=False) # Commented out for testing

    async def run(self):
        self.status = "running"
        await asyncio.sleep(5)  # simulate long running query
        # perform some long task using the query sequence and get a return code #
        self.status = "finished"
        return 0 # or another code depending on the final output

@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
    query_data = await request.body()
    query_data = query_data.decode("utf-8")
    query_data = dict(str(x).split("=") for x in query_data.split("&"))
    query = Query(**query_data)
    QUERY_BUFFER[query.experiment_id] = query
    background_tasks.add_task(process, query)
    LOGGER.info(f'root - added task')
    return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}


def process(query):
    """ Schedule processing of query, and then run some long running non-IO job without blocking the app"""
    asyncio.run_coroutine_threadsafe(aprocess(query), loop)
    LOGGER.info(f"process - {query.experiment_id} - Submitted query job. Now run non-IO work for 10 seconds...")
    time.sleep(10) # simulate long running non-IO work, does not block app as this is in another thread - provided it is not cpu bound.
    LOGGER.info(f'process - {query.experiment_id} - wake up!')


async def aprocess(query):
    """ Process query and generate data """
    ret_code = await query.run()
    del QUERY_BUFFER[query.experiment_id]
    LOGGER.info(f'aprocess - Query {query.experiment_id} processing finished with return code {ret_code}.')


@app.get("/backlog/")
def return_backlog():
    return {f"return_backlog - Currently {len(QUERY_BUFFER)} jobs in the backlog."}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run("scratch_26:app", host="127.0.0.1", port=8000)

Run Code Online (Sandbox Code Playgroud)

原答案:

*对此答案的警告 - 我尝试使用“httpx.AsyncClient”进行测试,与在 guvicorn 后面部署相比,这可能会导致不同的行为。*

据我所知(并且我非常乐于纠正这一点),BackgroundTasks实际上需要在发送 HTTP 响应之前完成。这不是 Starlette 文档或 FastAPI 文档所说的,但似乎是这样,至少在使用 httpx AsyncClient 时是这样。

无论您添加协程(在主线程中执行)还是函数(在其自己的侧线程中执行),HTTP 响应都会被阻止发送,直到后台任务完成。

如果您想等待长时间运行(异步友好)的任务,您可以使用包装函数来解决此问题。包装函数将实际任务(一个协程,因为它将使用await)添加到事件循环中,然后返回。由于速度非常快,因此它“阻塞”的事实不再重要(假设几毫秒并不重要)。

然后,真正的任务依次执行(但在发送初始 HTTP 响应之后),虽然它在主线程上,但函数的 asyncio 部分不会阻塞。

你可以试试这个:

@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
    ...
    background_tasks.add_task(process_wrapper, query)
    ...

async def process_wrapper(query):
    loop = asyncio.get_event_loop()
    loop.create_task(process(query))

async def process(query):
    """ Process query and generate data"""
    ret_code = await query.run()
    del QUERY_BUFFER[query.experiment_id]
    print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')

Run Code Online (Sandbox Code Playgroud)

另请注意,您还需要run()通过添加关键字来使您的函数成为协程async,因为您希望从函数中等待它process()

这是一个用于httpx.AsyncClient测试它的完整工作示例。为了说明目的,我添加了fmt_duration辅助函数来显示经过的时间。我还注释掉了创建目录的代码,并在函数中模拟了 2 秒的查询持续时间run()

import asyncio
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
from httpx import AsyncClient

EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}

app = FastAPI()
start_ts = time.time()


@dataclass
class Query():
    query_name: str
    query_sequence: str
    experiment_id: str = None
    status: str = "pending"

    def __post_init__(self):
        self.experiment_id = str(time.time())
        self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
        # os.makedirs(self.experiment_dir, exist_ok=False) # Commented out for testing

    async def run(self):
        self.status = "running"
        await asyncio.sleep(2)  # simulate long running query
        # perform some long task using the query sequence and get a return code #
        self.status = "finished"
        return 0 # or another code depending on the final output

@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
    query_data = await request.body()
    query_data = query_data.decode("utf-8")
    query_data = dict(str(x).split("=") for x in query_data.split("&"))
    query = Query(**query_data)
    QUERY_BUFFER[query.experiment_id] = query
    background_tasks.add_task(process_wrapper, query)
    print(f'{fmt_duration()} - root - added task')
    return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}


async def process_wrapper(query):
    loop = asyncio.get_event_loop()
    loop.create_task(process(query))

async def process(query):
    """ Process query and generate data"""
    ret_code = await query.run()
    del QUERY_BUFFER[query.experiment_id]
    print(f'{fmt_duration()} - process - Query {query.experiment_id} processing finished with return code {ret_code}.')

@app.get("/backlog/")
def return_backlog():
    return {f"{fmt_duration()} - return_backlog - Currently {len(QUERY_BUFFER)} jobs in the backlog."}


async def test_me():
    async with AsyncClient(app=app, base_url="http://example") as ac:
        res = await ac.post("/", content="query_name=foo&query_sequence=42")
        print(f"{fmt_duration()} - [{res.status_code}] - {res.content.decode('utf8')}")
        res = await ac.post("/", content="query_name=bar&query_sequence=43")
        print(f"{fmt_duration()} - [{res.status_code}] - {res.content.decode('utf8')}")
        content = ""
        while not content.endswith('0 jobs in the backlog."]'):
            await asyncio.sleep(1)
            backlog_results = await ac.get("/backlog")
            content = backlog_results.content.decode("utf8")
            print(f"{fmt_duration()} - test_me - content: {content}")


def fmt_duration():
    return f"Progress time: {time.time() - start_ts:.3f}s"

loop = asyncio.get_event_loop()
print(f'starting loop...')
loop.run_until_complete(test_me())
duration = time.time() - start_ts
print(f'Finished. Duration: {duration:.3f} seconds.')
Run Code Online (Sandbox Code Playgroud)

在我的本地环境中,如果我运行上面的命令,我会得到以下输出:

starting loop...
Progress time: 0.005s - root - added task
Progress time: 0.006s - [200] - {"Query created":{"query_name":"foo","query_sequence":"42","experiment_id":"1627489235.9300923","status":"pending","experiment_dir":"/experiments/1627489235.9300923"},"Query ID":"1627489235.9300923","Backlog Length":1}
Progress time: 0.007s - root - added task
Progress time: 0.009s - [200] - {"Query created":{"query_name":"bar","query_sequence":"43","experiment_id":"1627489235.932097","status":"pending","experiment_dir":"/experiments/1627489235.932097"},"Query ID":"1627489235.932097","Backlog Length":2}
Progress time: 1.016s - test_me - content: ["Progress time: 1.015s - return_backlog - Currently 2 jobs in the backlog."]
Progress time: 2.008s - process - Query 1627489235.9300923 processing finished with return code 0.
Progress time: 2.008s - process - Query 1627489235.932097 processing finished with return code 0.
Progress time: 2.041s - test_me - content: ["Progress time: 2.041s - return_backlog - Currently 0 jobs in the backlog."]
Finished. Duration: 2.041 seconds.
Run Code Online (Sandbox Code Playgroud)

我还尝试创建process_wrapper一个函数,以便 Starlette 在新线程中执行它。这工作方式相同,只是使用run_coroutine_threadsafe而不是create_taskie

def process_wrapper(query):
    loop = asyncio.get_event_loop()
    asyncio.run_coroutine_threadsafe(process(query), loop)
Run Code Online (Sandbox Code Playgroud)

如果有其他方法可以让后台任务在不阻塞 HTTP 响应的情况下运行,我很想知道如何实现,但缺少此包装解决方案应该可以工作。