tuo*_*tik 75
您可以使用BackgroundScheduler()
从APScheduler包(V3.5.3):
import time
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
def print_date_time():
print(time.strftime("%A, %d. %B %Y %I:%M:%S %p"))
scheduler = BackgroundScheduler()
scheduler.add_job(func=print_date_time, trigger="interval", seconds=3)
scheduler.start()
# Shut down the scheduler when exiting the app
atexit.register(lambda: scheduler.shutdown())
Run Code Online (Sandbox Code Playgroud)
请注意,当Flask处于调试模式时,将启动其中两个调度程序.有关更多信息,请查看此问题.
Sea*_*ira 45
您可以APScheduler
在Flask应用程序中使用并通过其界面运行您的作业:
import atexit
# v2.x version - see https://stackoverflow.com/a/38501429/135978
# for the 3.x version
from apscheduler.scheduler import Scheduler
from flask import Flask
app = Flask(__name__)
cron = Scheduler(daemon=True)
# Explicitly kick off the background thread
cron.start()
@cron.interval_schedule(hours=1)
def job_function():
# Do your work here
# Shutdown your cron thread if the web process is stopped
atexit.register(lambda: cron.shutdown(wait=False))
if __name__ == '__main__':
app.run()
Run Code Online (Sandbox Code Playgroud)
iva*_*ncz 28
我对应用程序调度程序的概念有点新意,但是我在APScheduler v3.3.1中找到了它,它有点不同.我相信对于最新版本,包结构,类名等已经改变,所以我在这里提出了一个我最近制作的新解决方案,与基本的Flask应用程序集成:
#!/usr/bin/python3
""" Demonstrating Flask, using APScheduler. """
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask
def sensor():
""" Function for test purposes. """
print("Scheduler is alive!")
sched = BackgroundScheduler(daemon=True)
sched.add_job(sensor,'interval',minutes=60)
sched.start()
app = Flask(__name__)
@app.route("/home")
def home():
""" Function for test purposes. """
return "Welcome Home :) !"
if __name__ == "__main__":
app.run()
Run Code Online (Sandbox Code Playgroud)
如果有人对此示例的更新感兴趣,我也会在这里留下这个要点.
以下是一些参考资料,供将来阅读:
Bem*_*mmu 14
对于一个简单的解决方案,您可以添加一条路线,例如
@app.route("/cron/do_the_thing", methods=['POST'])
def do_the_thing():
logging.info("Did the thing")
return "OK", 200
Run Code Online (Sandbox Code Playgroud)
然后添加一个定期发布到此端点的 unix cron 作业。例如,每分钟运行一次,在终端输入crontab -e
并添加以下行:
* * * * * /opt/local/bin/curl -X POST https://YOUR_APP/cron/do_the_thing
Run Code Online (Sandbox Code Playgroud)
(请注意,curl 的路径必须完整,因为当作业运行时,它不会有您的 PATH。您可以通过 找到系统上 curl 的完整路径which curl
)
我喜欢这一点,因为它很容易手动测试作业,它没有额外的依赖项,并且因为没有任何特别的事情发生,所以很容易理解。
如果您想用密码保护您的 cron 作业,您可以pip install Flask-BasicAuth
,然后将凭据添加到您的应用程序配置中:
app = Flask(__name__)
app.config['BASIC_AUTH_REALM'] = 'realm'
app.config['BASIC_AUTH_USERNAME'] = 'falken'
app.config['BASIC_AUTH_PASSWORD'] = 'joshua'
Run Code Online (Sandbox Code Playgroud)
要密码保护作业端点:
from flask_basicauth import BasicAuth
basic_auth = BasicAuth(app)
@app.route("/cron/do_the_thing", methods=['POST'])
@basic_auth.required
def do_the_thing():
logging.info("Did the thing a bit more securely")
return "OK", 200
Run Code Online (Sandbox Code Playgroud)
然后从您的 cron 工作中调用它:
* * * * * /opt/local/bin/curl -X POST https://falken:joshua@YOUR_APP/cron/do_the_thing
Run Code Online (Sandbox Code Playgroud)
小智 12
我试过使用烧瓶而不是一个简单的 apscheduler 你需要安装的是
pip3 安装flask_apscheduler
以下是我的代码示例:
from flask import Flask
from flask_apscheduler import APScheduler
app = Flask(__name__)
scheduler = APScheduler()
def scheduleTask():
print("This test runs every 3 seconds")
if __name__ == '__main__':
scheduler.add_job(id = 'Scheduled Task', func=scheduleTask, trigger="interval", seconds=3)
scheduler.start()
app.run(host="0.0.0.0")
Run Code Online (Sandbox Code Playgroud)
KD *_*ang 10
您可以尝试使用APScheduler的BackgroundScheduler将间隔作业集成到Flask应用程序中.以下是使用blueprint和app factory(init .py)的示例:
from datetime import datetime
# import BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask
from webapp.models.main import db
from webapp.controllers.main import main_blueprint
# define the job
def hello_job():
print('Hello Job! The time is: %s' % datetime.now())
def create_app(object_name):
app = Flask(__name__)
app.config.from_object(object_name)
db.init_app(app)
app.register_blueprint(main_blueprint)
# init BackgroundScheduler job
scheduler = BackgroundScheduler()
# in your case you could change seconds to hours
scheduler.add_job(hello_job, trigger='interval', seconds=3)
scheduler.start()
try:
# To keep the main thread alive
return app
except:
# shutdown if app occurs except
scheduler.shutdown()
Run Code Online (Sandbox Code Playgroud)
希望能帮助到你 :)
参考:
小智 6
另一个替代方案可能是使用Flask-APScheduler,它可以很好地与Flask一起使用,例如:
更多信息:
https://pypi.python.org/pypi/Flask-APScheduler
您可以使用flask-crontab模块,这非常简单。
步骤1:pip安装flask-crontab
第2步:
from flask import Flask
from flask_crontab import Crontab
app = Flask(__name__)
crontab = Crontab(app)
Run Code Online (Sandbox Code Playgroud)
步骤3:
@crontab.job(minute="0", hour="6", day="*", month="*", day_of_week="*")
def my_scheduled_job():
do_something()
Run Code Online (Sandbox Code Playgroud)
第 4 步:在 cmd 上,点击
flask crontab add
Run Code Online (Sandbox Code Playgroud)
完毕。现在只需运行您的 Flask 应用程序,您就可以检查您的函数将在每天 6:00 调用。
您可以参考这里(官方文档)。
归档时间: |
|
查看次数: |
74722 次 |
最近记录: |