ery*_*ydo 6 python concurrency rabbitmq celery celerybeat
我有芹菜和芹菜(四名工人)批量做一些加工步骤.其中一项任务大致是这样的,"对于每个没有创建Y的X,创建Y."
该任务以半快速(10秒)定期运行.任务很快完成.还有其他任务正在进行中.
我多次遇到这个问题,其中节拍任务显然已经积压,因此同时执行相同的任务(来自不同的节拍时间),导致错误的重复工作.似乎任务也是无序执行的.
是否有可能限制芹菜节拍以确保一次只有一个突出的任务实例?设置类似于rate_limit=5任务的东西是"正确"的方式吗?
是否有可能确保按顺序执行节拍任务,例如,不是分派任务,节拍将其添加到任务链中?
处理这个问题的最佳方法是什么,除了使这些任务本身以原子方式执行并且可以安全地同时执行?这不是我对预期任务的限制......
任务本身是天真地定义的:
@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
# Do things in a database
return
Run Code Online (Sandbox Code Playgroud)
这是一个实际的(清理过的)日志:
[00:00.000]foocorp.tasks.add_y_to_xs发送.ID - >#1[00:00.001] 收到的任务:foocorp.tasks.add_y_to_xs [#1][00:10.009]foocorp.tasks.add_y_to_xs发送.ID - >#2[00:20.024]foocorp.tasks.add_y_to_xs发送.ID - >#3[00:26.747] 收到的任务:foocorp.tasks.add_y_to_xs [#2][00:26.748] TaskPool:应用#2[00:26.752] 收到的任务:foocorp.tasks.add_y_to_xs [#3][00:26.769] 接受的任务:foocorp.tasks.add_y_to_xs [#2] pid:26528[00:26.775] 任务foocorp.tasks.add_y_to_xs [#2]成功完成0.0197986490093s:无[00:26.806] TaskPool:应用#1[00:26.836] TaskPool:应用#3[01:30.020] 接受的任务:foocorp.tasks.add_y_to_xs [#1] pid:26526[01:30.053] 接受的任务:foocorp.tasks.add_y_to_xs [#3] pid:26529[01:30.055] foocorp.tasks.add_y_to_xs [#1]:为X id添加Y#9725[01:30.070] foocorp.tasks.add_y_to_xs [#3]:为X id添加Y#9725[01:30.074] 任务foocorp.tasks.add_y_to_xs [#1]成功完成0.0594762689434s:无[01:30.087] 任务foocorp.tasks.add_y_to_xs [#3]成功完成0.0352867960464s:无我们目前正在使用带有RabbitMQ的Celery 3.1.4作为传输.
编辑丹,这是我想出的:
丹,这是我最终使用的:
from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager
def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
lock_fn = (func.pg_advisory_xact_lock_shared
if shared else
func.pg_advisory_xact_lock)
if timeout:
conn.execute(text('SET statement_timeout TO :timeout'),
timeout=timeout)
try:
conn.execute(select([lock_fn(lock_id)]))
except DBAPIError:
return False
return True
def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
lock_fn = (func.pg_try_advisory_xact_lock_shared
if shared else
func.pg_try_advisory_xact_lock)
return conn.execute(select([lock_fn(lock_id)])).scalar()
class DatabaseLockFailed(Exception):
pass
@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
"""
Context manager which acquires a PSQL advisory transaction lock with a
specified name.
"""
lock_id = hash(name)
with engine.begin() as conn, conn.begin():
if block:
locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
timeout)
else:
locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
if not locked:
raise DatabaseLockFailed()
yield
Run Code Online (Sandbox Code Playgroud)
和芹菜任务装饰器(仅用于定期任务):
from functools import wraps
from preo.extensions import db
def locked(name=None, block=True, timeout='1s'):
"""
Using a PostgreSQL advisory transaction lock, only runs this task if the
lock is available. Otherwise logs a message and returns `None`.
"""
def with_task(fn):
lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)
@wraps(fn)
def f(*args, **kwargs):
try:
with db_lock(db.engine, name=lock_id, block=block,
timeout=timeout):
return fn(*args, **kwargs)
except DatabaseLockFailed:
logger.error('Failed to get lock.')
return None
return f
return with_task
Run Code Online (Sandbox Code Playgroud)
小智 9
def skip_if_running(f):
u"""
?? ????????? ?????? ? ?????? ?? ??????????? ???? ??? ??? ? ?????????
"""
task_name = u'%s.%s' % (f.__module__, f.__name__)
mylog.info(u'skip decorator for %s' % task_name)
@wraps(f)
def fun(self, *args, **kwargs):
try:
uargs = unicode(args)
ukwargs = unicode(kwargs)
i = clr_app.control.inspect()
workers = i.active()
for worker, tasks in workers.items():
for task in tasks:
if task_name == task['name'] and uargs == task['args'] and ukwargs == task['kwargs'] and self.request.id != task['id']:
mylog.warning(u'task %s (%s, %s) is started on %s, skip current' % (task_name, uargs, ukwargs, worker))
return None
except Exception as e:
mylog.error(e)
return f(*args, **kwargs)
return fun
@clr_app.task(bind=True)
@skip_if_running
def test_single_task(arg):
pass
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8074 次 |
| 最近记录: |