Mik*_*Lee 17 python django celery
我有一个任务,有点像这样:
@task()
def async_work(info):
...
Run Code Online (Sandbox Code Playgroud)
在任何时候,我都可以使用一些信息调用async_work.出于某种原因,我需要确保一次只运行一个async_work,其他调用请求必须等待.
所以我想出了以下代码:
is_locked = False
@task()
def async_work(info):
while is_locked:
pass
is_locked = True
...
is_locked = False
Run Code Online (Sandbox Code Playgroud)
但是它说访问局部变量是无效的......如何解决?
Iho*_*nko 28
访问本地变量是无效的,因为您可以让几个芹菜工人运行任务.那些工人甚至可能在不同的主机上.因此,基本上,有许多is_locked
变量实例,因为许多Celery工作者正在运行您的async_work
任务.因此,即使您的代码不会引发任何错误,您也无法获得所需的效果.
要实现您的目标,您需要将Celery配置为仅运行一个工作程序.由于任何工人都可以在任何给定时间处理单个任务,因此您可以获得所需的任务.
编辑:
根据工人指南>并发:
默认情况下,多处理用于执行任务的并发执行,但您也可以使用Eventlet.可以使用
--concurrency
参数更改工作进程/线程的数量,并默认为计算机上可用的CPU数量.
因此,您需要像这样运行worker:
$ celery worker --concurrency=1
Run Code Online (Sandbox Code Playgroud)
编辑2:
令人惊讶的是,还有另一种解决方案,而且即使在官方文档中,也可以看到确保任务一次只执行一篇文章.
您可能不想concurrency=1
为您的芹菜工作者使用-您希望同时处理任务。相反,您可以使用某种锁定机制。只要确保缓存超时大于完成任务的时间即可。
import redis
from contextlib import contextmanager
redis_client = redis.Redis(host='localhost', port=6378)
@contextmanager
def redis_lock(lock_name):
"""Yield 1 if specified lock_name is not already set in redis. Otherwise returns 0.
Enables sort of lock functionality.
"""
status = redis_client.set(lock_name, 'lock', nx=True)
try:
yield status
finally:
redis_client.delete(lock_name)
@task()
def async_work(info):
with redis_lock('my_lock_name') as acquired:
do_some_work()
Run Code Online (Sandbox Code Playgroud)
受芹菜文档启发的示例
from contextlib import contextmanager
from django.core.cache import cache
@contextmanager
def memcache_lock(lock_name):
status = cache.add(lock_name, 'lock')
try:
yield status
finally:
cache.delete(lock_name)
@task()
def async_work(info):
with memcache_lock('my_lock_name') as acquired:
do_some_work()
Run Code Online (Sandbox Code Playgroud)
我已经实现了一个装饰器来处理这个问题。它基于Celery 官方文档中的确保一次只执行一项任务。
它使用函数的名称及其 args 和 kwargs 创建一个 lock_id,它在 Django 的缓存层中设置/获取(我只使用 Memcached 测试过它,但它也应该适用于 Redis)。如果 lock_id 已在缓存中设置,它将把任务放回到队列中并退出。
CACHE_LOCK_EXPIRE = 30
def no_simultaneous_execution(f):
"""
Decorator that prevents a task form being executed with the
same *args and **kwargs more than one at a time.
"""
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
# Create lock_id used as cache key
lock_id = '{}-{}-{}'.format(self.name, args, kwargs)
# Timeout with a small diff, so we'll leave the lock delete
# to the cache if it's close to being auto-removed/expired
timeout_at = monotonic() + CACHE_LOCK_EXPIRE - 3
# Try to acquire a lock, or put task back on queue
lock_acquired = cache.add(lock_id, True, CACHE_LOCK_EXPIRE)
if not lock_acquired:
self.apply_async(args=args, kwargs=kwargs, countdown=3)
return
try:
f(self, *args, **kwargs)
finally:
# Release the lock
if monotonic() < timeout_at:
cache.delete(lock_id)
return wrapper
Run Code Online (Sandbox Code Playgroud)
然后,您可以将其作为第一个装饰器应用于任何任务:
@shared_task(bind=True, base=MyTask)
@no_simultaneous_execution
def sometask(self, some_arg):
...
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
9029 次 |
最近记录: |