如何在Python中使用Flask执行定期任务

Bol*_*ter 17 python api web-services wsgi flask

我一直在使用Flask为我的k8055 USB接口板提供一个简单的Web API ; 相当标准的吸气剂和推杆,Flask真的让我的生活变得更轻松.

但我希望能够在乳清发生时将状态变化记录为/接近.

例如,如果我有一个连接到电路板的按钮,我可以轮询该特定端口的api.但如果我想让输出直接反映输出,无论是否有人在与api交谈,我都会有这样的事情.

while True:
    board.read()
    board.digital_outputs = board.digital_inputs
    board.read()
    time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

每一秒,输出都会更新以匹配输入.

在Flask下有没有办法做这种事情?我之前在Twisted中做过类似的事情,但Flask对于这个特定的应用程序来说太方便了,只是放弃它...

谢谢.

Pas*_*hka 13

你可以使用cron来完成简单的任务.

为您的任务创建一个烧瓶视图.

# a separate view for periodic task
@app.route('/task')
def task():
    board.read()
    board.digital_outputs = board.digital_inputs
Run Code Online (Sandbox Code Playgroud)

然后使用cron,定期从该URL下载

# cron task to run each minute
0-59 * * * * run_task.sh
Run Code Online (Sandbox Code Playgroud)

run_task.sh内容的位置

wget http://localhost/task
Run Code Online (Sandbox Code Playgroud)

Cron无法比一分钟更频繁地跑步.如果您需要更高的频率(例如,每5秒=每分钟12次),您必须以下列方式在tun_task.sh中执行此操作

# loop 12 times with a delay
for i in 1 2 3 4 5 6 7 8 9 10 11 12
do
    # download url in background for not to affect delay interval much
    wget -b http://localhost/task
    sleep 5s
done
Run Code Online (Sandbox Code Playgroud)


Ant*_*ony 13

对于我的Flask应用程序,我考虑使用Pashka在他的答案,日程安排库和APScheduler中描述的cron方法.

我发现APScheduler很简单并且服务于定期任务运行目的,所以继续使用APScheduler.

示例代码:

from flask import Flask

from apscheduler.schedulers.background import BackgroundScheduler


app = Flask(__name__)

def test_job():
    print('I am working...')

scheduler = BackgroundScheduler()
job = scheduler.add_job(test_job, 'interval', minutes=1)
scheduler.start()
Run Code Online (Sandbox Code Playgroud)

  • @TobiasFeil 这是 Werkzeug 开发服务器重新加载行为。您可以使用 `flask run --no-reload` 来禁用重新加载器,或者将作业创建包含在 if 条件 `if os.environ.get("WERKZEUG_RUN_MAIN") == "true":` 中。有关更多信息,请参阅 /sf/answers/663369101/。 (3认同)
  • 出于某种原因,当设置了 `FLASK_ENV=development` 时,作业会在每个间隔过去后执行两次。 (2认同)