Arc*_*ger 5 python celery flask
我有一个调用任务的烧瓶应用程序。该任务从数据库中提取数据,绘制折线图并返回呈现在 html 页面上的 html 内容。如果没有 Celery,Flask 应用程序工作正常并在客户端呈现折线图,但现在我想委托 celery 通过RabbitMQ代理运行任务,它运行,因为我可以在 Celery shell 中看到日志输出,但生成的 html 内容永远不会被发送回到烧瓶服务器应用程序。怎么做?
#server-celery.py
app = Flask(__name__)
@app.route('/',methods=['GET'])
def index():
return render_template("index.html")
@app.route('/', methods=['GET', 'POST'])
def plotdata():
form = InputForm(request.form)
if request.method == 'POST' and form.validate():
lineChart = task.main.delay(form.startdate.data, form.enddate.data)
return render_template('view.html', form=form, result= lineChart)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
Run Code Online (Sandbox Code Playgroud)
和作为Celery工作者运行的任务:
#task.py
from celery import Celery
broker = 'amqp://localhost:5672'
app = Celery(__name__, broker=broker)
def queryDatabase(param1, param2):
#query database, return results
return data
def plotLineChart(data):
#plot data as line Chart
return LineChartHtmlcontent
@app.task
def main(param1,param2):
data = queryDatabase(param1,param2)
return plotLineChart(data)
Run Code Online (Sandbox Code Playgroud)
和客户端 html 页面:
<!--view.html-->
<form method=post action="">
<table>
{% for field in form %}
<tr>
<td>{{ field.label }}</td>
<td>{{ field }}</td>
</tr>
{% endfor %}
</table>
<p><input type=submit value=Submit></form></p>
<p>
{% if result != None %}
<img src="{{ result }}" width="500">
{% endif %}
</p>
Run Code Online (Sandbox Code Playgroud)
更好的解决方案是简单地使用 celery 异步运行任务,就像它打算使用的那样,并在页面上使用 javascript 定期轮询 celery 任务以查看状态。
首先,当您创建 celery 任务时,请使用 bind=True 参数。这允许您传入self函数。
@app.task(bind=True)
def main(self, param1, param2):
self.update_state(state='PENDING')
data = queryDatabase(param1,param2)
self.update_state(state='COMPLETE')
return plotLineChart(data)
Run Code Online (Sandbox Code Playgroud)
您可以看到我们现在可以更新任务的状态。您也可以修改代码以允许元数据进入update_state方法,例如:self.update_state(state='PROGRESS', meta={'current': 0, 'total': 12})
现在您需要轮询 celery 任务以查看它的运行情况。
@blueprint.route('/status/<task_id>', methods=['GET'])
def taskstatus(task_id=None):
task = main.AsyncResult(task_id)
response = {
'state': task.state,
}
return jsonify(response)
Run Code Online (Sandbox Code Playgroud)
当您使用任务 ID 轮询 URL 时,这将返回一个 json 对象。您可以lineChart.id在调用后使用获取任务ID.delay()
请参阅 miguel grinberg 的优秀教程:http ://blog.miguelgrinberg.com/post/using-celery-with-flask
| 归档时间: |
|
| 查看次数: |
10845 次 |
| 最近记录: |