FastAPI @repeat_every 如何防止并行 def schedule_task() 实例

use*_*186 5 fastapi

我们正在使用fastapi-utils在后台执行计划任务。如果新数据在数据库中可用,我们检查所有 5 秒,如果是,我们处理它(最多需要 5 分钟)

在此期间,协程应处于阻塞状态,以便仅触发一次。

我们注意到我们的数据有时会被处理 3 倍,我们假设调度程序继续运行,即使该函数已被触发。

因此我们试图用IsRunningQuery变量来规避它。

我们尝试了一个带有 while True 循环的解决方案@repeat_every,但不让它在启动时运行一次,但 Azure Webapps 不允许运行它。

@app.on_event("startup") 
@repeat_every(wait_first=True,seconds=int(10))
def scheduled_task() -> None:
    global IsRunningQuery
    global LastCheck
    if IsRunningQuery == False:
        IsRunningQuery = True
        gunicorn_logger.info("status='checkforleads'")
        OurProccessingClass.processDataBaseData() # can take up 5 minutes
        LastCheck=Utils.datetime()
        IsRunningQuery = False

Run Code Online (Sandbox Code Playgroud)

此变体适用于我们的 DEV 环境,但不适用于 Azure

@app.on_event("startup") 
async def scheduled_task() -> None:
    while True:
        gunicorn_logger.info("status='checkforleads'")
        OurProccessingClass.processDataBaseData() # can take up 5 minutes
        time.sleep(int(os.environ["CRM_SLEEP"]))
Run Code Online (Sandbox Code Playgroud)

Gwy*_*idD 1

要完成此任务,您需要一些适合您的环境的锁定系统。

例如,当仅使用单个异步循环运行单个工作程序时,Lock来自 asyncio 同步原语的简单方法将是理想的选择......

但是如果你想引入更多的worker,那么锁的状态将不会在实例之间同步。如果您的工作程序在同一系统上生成,您可以使用文件系统锁(例如模块中的锁fnctl),但同样,如果您引入更多服务器实例,它将不再起作用。

下一步可能是在数据库级别引入锁,或者任何其他能够管理锁或仅向一个接收者交付某些任务的外部系统,但这很快就会变得非常复杂。

这就是为什么,像 celery 这样的系统允许您安排任务,并且如果可能的话,系统会注意防止多次执行该任务(请注意,这并不总是可能的,因为执行者可能会完成该任务但永远不要因为某些致命错误或任何其他中断(例如断电)而更新任务的状态。这就是为什么此类系统可以确保该任务至少运行一次或最多运行一次,但是永远不会保证两者,只会尽最大努力最大化另一个的机会)。