另一个函数内部的 Flask Celery update_state

van*_*ath 5 celery flask celery-task

我想从另一个函数更新我的 Celery 任务的状态。这是我现在所拥有的:

路线

@app.route('/my-long-function', methods=['POST'])
def my_long_function():

    param1 = request.form['param1']
    param2 = request.form['param2']

    task = outside_function.delay(param1, param2)

    return task.id
Run Code Online (Sandbox Code Playgroud)

Celery Task - 在后台启动 some_python_script.handle

@celery.task(name='outside_function')
def outside_function(param1, param2):
    with app.app_context():
        some_python_script.handle(param1, param2)
Run Code Online (Sandbox Code Playgroud)

some_python_script.handle:

def handle(param1, param2):
    param1 + param2
    # many, many different things
Run Code Online (Sandbox Code Playgroud)

理想情况下,我希望能够 self.update_state celery 任务,以便我可以轻松地从我的应用程序请求其状态,如下所示:

some_python_script.handle(理想情况下):

def handle(param1, param2):
    param1 + param2
    # many, many different things
    self.outside_function.update_state('PROGRESS', meta = {'status':'progressing'})
Run Code Online (Sandbox Code Playgroud)

检查进度(理想情况下):

@app.route('/status/<task_id>')
def taskstatus(task_id):
    task = outside_function.AsyncResult(task_id)
    response = {
    'state': task.state,
    'id': task.id,
    'status' : task.status,
    }

    return jsonify(response)
Run Code Online (Sandbox Code Playgroud)

或者类似的东西。感谢您的帮助,我对芹菜很陌生!

ami*_*min 0

您应该声明调用的任务ID。您可以检查update_state

下面的代码应该可以工作。

# capture id of celery task
ID = self.request.id

def handle(param1, param2):
    param1 + param2
    # many, many different things
    # update the state of celery task with direct reference to it
    self.update_state(task_id=ID, state='PROGRESS', meta = {'status':'progressing'})
Run Code Online (Sandbox Code Playgroud)

  • 它不起作用,因为在“handle”范围内没有“self”。 (2认同)