from celery import Celery
app = Celery('tasks', backend='amqp://guest@localhost//', broker='amqp://guest@localhost//')
a_num = 0
@app.task
def addone():
global a_num
a_num = a_num + 1
return a_num
Run Code Online (Sandbox Code Playgroud)
这是我用来测试芹菜的代码.我希望每次使用addone()时返回值都应该增加.但它总是1为什么???
结果
python
>> from tasks import addone
>> r = addone.delay()
>> r.get()
1
>> r = addone.delay()
>> r.get()
1
>> r = addone.delay()
>> r.get()
1
Run Code Online (Sandbox Code Playgroud)
Lou*_*uis 17
默认情况下,当工作程序启动时,Celery以并发4启动它,这意味着它有4个进程开始处理任务请求.(另外还有一个控制其他进程的进程.)我不知道使用什么算法将任务请求分配给为工作者启动的进程,但最终,如果执行addone.delay().get()得足够多,您将看到数字大于1.会发生什么是每个进程(不是每个任务)都有自己的副本a_num.当我在这里尝试时,我第五次执行addone.delay().get()返回2.
您可以通过使用单个进程启动工作程序来强制每次增加数字来处理请求.(例如celery -A tasks worker -c1)但是,如果你重新启动你的工作人员,编号将被重置为0.此外,我不会设计仅在处理请求的进程数量被强制为1时才有效的代码.一天后,一位同事决定多个进程应该处理对任务的请求,然后事情就会中断.(代码中的注释中的大胖警告只能做这么多.)
在一天结束时,这样的状态应该在缓存中共享,比如Redis,或者用作缓存的数据库,这将适用于您的问题中的代码.
但是,你在评论中写道:
让我们看看我想用一个任务发送一些东西.我希望共享一个全局连接,而不是每次都在任务中连接.
将连接存储在缓存中将不起作用.我强烈建议让Celery开始使用自己的连接而不是尝试在进程之间共享它的每个进程.每个新任务请求都不需要重新打开连接.每个进程打开一次,然后此进程提供的每个任务请求都会重用该连接.
在许多情况下,尝试在进程之间共享相同的连接(例如通过共享虚拟内存fork)将无法工作.连接通常带有状态(例如,数据库连接是否处于自动提交模式).如果代码的两个部分期望连接处于不同的状态,则代码将以不一致的方式运行.
| 归档时间: |
|
| 查看次数: |
5168 次 |
| 最近记录: |