我正在 Flask 中开发一个简单的 API,它不是 REST 或任何东西。
有一个模块可以返回列表中的实时数据。
有问题的模块
# module.py
def get_data():
do_something()
return [info_from_somewhere, info_from_other_place]
Run Code Online (Sandbox Code Playgroud)
还有应用程序
# app.py
import module
@app.route('/')
def get_data():
return jsonify(data=module.get_data()[0])
Run Code Online (Sandbox Code Playgroud)
问题是,每次有人请求该路由时,这都会运行该函数。由于数据只有一个,所以我想为每个请求提供它,但只运行该函数一次。
编辑:我尝试过这个:
got_data = module.get_data()
@app.route('/')
def get_data():
return jsonify(data=got_data[0])
Run Code Online (Sandbox Code Playgroud)
有效,但不要刷新列表。所以我的问题是“我怎样才能每秒刷新它?” 我尝试睡眠,但它冻结了我的应用程序
你可以用芹菜来实现这一点。来自项目页面。
Celery是一个基于分布式消息传递的异步任务队列/作业队列。它专注于实时操作,但也支持调度。
其他解决方案可以通过生成一个线程来完成,该线程每秒都会更新数据,但这可能会很快变得棘手。
from threading import Timer
from flask import Flask
app = Flask(__name__)
DATA = "data"
def update_data(interval):
Timer(interval, update_data, [interval]).start()
global DATA
DATA = DATA + " updating..."
# update data every second
update_data(1)
@app.route("/")
def index():
return DATA
if __name__ == "__main__":
app.run(debug=True)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3626 次 |
| 最近记录: |