设置celery周期性任务

fra*_*lin 4 python celery flask

如何使用 Celerybeat 和 Flask 设置每小时查询一次数据库的定期任务?

环境是这样的:

/
|-app
  |-__init__.py
  |-jobs
    |-task.py
|-celery-beat.sh
|-celery-worker.sh
|-manage.py
Run Code Online (Sandbox Code Playgroud)

我目前有一个名为的查询函数run_query()位于task.py

我希望调度程序在应用程序启动后立即启动,因此我的/app/__init__.py文件夹中有以下几行:

/
|-app
  |-__init__.py
  |-jobs
    |-task.py
|-celery-beat.sh
|-celery-worker.sh
|-manage.py
Run Code Online (Sandbox Code Playgroud)

(为了简单起见,我将其设置为如果它运行,它将每分钟运行一次。还没有这样的运气。)

当我启动它时,celery-worker.sh它会识别标题下的我的功能[tasks]。但预定的功能永远不会运行。我可以通过在命令提示符下发出以下命令来手动强制该函数运行:

>> from app.jobs import task
>> task.run_query.delay()
Run Code Online (Sandbox Code Playgroud)

编辑:添加 celerybeat.sh

作为后续:如果通过烧瓶上下文访问数据库,在我的异步函数调用期间创建一个新的烧瓶上下文来访问数据库是否明智?使用现有的 Flask 上下文?或者完全忘记上下文并只是启动与数据库的连接?我担心的是,如果我只是启动一个新连接,它可能会干扰现有上下文的连接?

San*_*ake 5

要运行定期任务,您需要某种调度程序(例如 celerybeat)。

\n\n
\n

芹菜节拍是一个调度程序;它定期启动任务,然后由集群中的可用工作节点执行。

\n\n

您必须确保一次只运行一个调度程序,否则您将得到重复的任务。使用集中式方法意味着调度不必同步,并且服务可以在不使用锁的情况下运行。

\n
\n\n

参考:周期性任务

\n\n

您可以使用命令调用调度程序,

\n\n
$ celery -A proj beat #different process from your worker\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n

您还可以通过启用workers -B\n 选项将beat 嵌入到worker 中,如果您\xe2\x80\x99 永远不会运行多个worker\n 节点,这会很方便,但\xe2\x80\x99 并不常用因此,\xe2\x80\x99t 建议用于生产用途\n 启动调度程序:

\n
\n\n
$ celery -A proj worker -B\n
Run Code Online (Sandbox Code Playgroud)\n\n

参考:celery启动调度程序

\n