为什么`celery.current_app`引用Flask视图函数中的默认实例

Pie*_*rre 5 python thread-safety celery flask

我不是试图celery.current_app在视图函数中使用,但我有一个函数挂钩after_task_publish信号,它在任务发布后使用它来更新状态,它在Flask视图函数之外工作并正确更新状态,但是当我从视图函数内部发送任务,任务状态将不会更新,我检查了问题是它current_app.backend是一个DisabledBackend默认的实例,而不是RedisBackend我正在使用的实例.

发生这种情况是因为在Flask视图函数内部,当前Celery实例的代理celery.current_app引用了当前没有Celery实例时创建的默认实例.

我尝试重现正在发生的事情,这是一个测试脚本:

from __future__ import absolute_import, print_function, unicode_literals

from flask import Flask, request

from celery import Celery, current_app
from celery.signals import after_task_publish
# internal module for debugging purposes
from celery._state import default_app, _tls


# the flask application
flask_app = Flask(__name__)

# the celery application
celery_app = Celery('tasks', broker='amqp://', backend='redis://')

# debugging info
debug = """
[{location}]
celery_app       = {celery_app}
current_app      = {current_app}
add.app          = {add_app}
default_app      = {default_app}
_tls.current_app = {tls_current_app}
"""

print(debug.format(
    location = 'OUTSIDE VIEW',
    celery_app = celery_app,
    current_app = current_app,
    add_app = add.app,
    default_app = default_app,
    tls_current_app = _tls.current_app
))


# fired after a task is published
@after_task_publish.connect
def after_publish(sender=None, body=None, **kwargs):
    print(debug.format(
        location = 'INSIDE SIGNAL FUNCTION',
        celery_app = celery_app,
        current_app = current_app,
        add_app = add.app,
        default_app = default_app,
        tls_current_app = _tls.current_app
    ))

# a simple task for testing
@celery_app.task(name='add')
def add(a, b):
    return a + b


@flask_app.route('/add')
def add_view():
    print(debug.format(
        location = 'INSIDE VIEW',
        celery_app = celery_app,
        current_app = current_app,
        add_app = add.app,
        default_app = default_app,
        tls_current_app = _tls.current_app
    ))

    a = request.args.get('a')
    b = request.args.get('b')

    task = add.delay(a, b)

    return task.task_id


if __name__ == '__main__':
    flask_app.run(debug=True)
Run Code Online (Sandbox Code Playgroud)

这是输出:

[OUTSIDE VIEW]
celery_app       = <Celery tasks:0xb69ede4c>
current_app      = <Celery tasks:0xb69ede4c>
add.app          = <Celery tasks:0xb69ede4c>
default_app      = None
_tls.current_app = <Celery tasks:0xb69ede4c>


[INSIDE VIEW]
celery_app       = <Celery tasks:0xb69ede4c>
current_app      = <Celery default:0xb6b0546c>
add.app          = <Celery tasks:0xb69ede4c>
default_app      = None
_tls.current_app = None   # making current_app fallback to the default instance


[INSIDE SIGNAL FUNCTION]
celery_app       = <Celery tasks:0xb69ede4c>
current_app      = <Celery default:0xb6a174ec>
add.app          = <Celery tasks:0xb69ede4c>
default_app      = None
_tls.current_app = None
Run Code Online (Sandbox Code Playgroud)

因为_tls.current_app视图中是None,所以这celery.current_app就是引用默认实例的原因,来自celery._state._get_current_app:

return _tls.current_app or default_app
Run Code Online (Sandbox Code Playgroud)

_tls是一个实例celery._state._TLS:

class _TLS(threading.local):
    #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
    #: sets this, so it will always contain the last instantiated app,
    #: and is the default app returned by :func:`app_or_default`.
    current_app = None
Run Code Online (Sandbox Code Playgroud)

问题与线程有关吗?这可能是个错误吗?或者这是预期的行为?

请注意,我可以在我的钩子函数中使用实际的celery实例,一切都会正常工作,但我担心celery.current_app这会在其他地方使用会破坏我的代码.

Pie*_*rre 11

我发现这个问题,当我跑的瓶应用,但不启用调试和它的工作没有任何问题,当debugTrue使用reloader是一个运行在另一个线程应用程序,这种情况发生在werkzeug._reloader.run_with_reloader函数.

并根据Python文档有关的类threading.local这是子类来存储当前应用程序实例:

表示线程本地数据的类.线程局部数据是其值是线程特定的数据.

对于单独的线程,实例的值将不同.

所以这意味着celery._state._tls.current_app线程之间不共享,我们必须手动将celery实例设置为当前应用程序,例如在view函数中:

celery_app.set_current()
Run Code Online (Sandbox Code Playgroud)

  • 不太确定我们是否有相同的场景,但我使用了 celery 的 `set_default`,因为它适用于所有线程...... http://docs.celeryproject.org/en/latest/reference/celery.html#celery.Celery 。默认设置 (2认同)