Celery Beat:一次限制为单个任务实例

ery*_*ydo 6 python concurrency rabbitmq celery celerybeat

我有芹菜和芹菜(四名工人)批量做一些加工步骤.其中一项任务大致是这样的,"对于每个没有创建Y的X,创建Y."

该任务以半快速(10秒)定期运行.任务很快完成.还有其他任务正在进行中.

我多次遇到这个问题,其中节拍任务显然已经积压,因此同时执行相同的任务(来自不同的节拍时间),导致错误的重复工作.似乎任务也是无序执行的.

  1. 是否有可能限制芹菜节拍以确保一次只有一个突出的任务实例?设置类似于rate_limit=5任务的东西是"正确"的方式吗?

  2. 是否有可能确保按顺序执行节拍任务,例如,不是分派任务,节拍将其添加到任务链中?

  3. 处理这个问题的最佳方法是什么,除了使这些任务本身以原子方式执行并且可以安全地同时执行?这不是我对预期任务的限制......

任务本身是天真地定义的:

@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
    # Do things in a database
    return
Run Code Online (Sandbox Code Playgroud)

这是一个实际的(清理过的)日志:

  • [00:00.000]foocorp.tasks.add_y_to_xs发送.ID - >#1
  • [00:00.001] 收到的任务:foocorp.tasks.add_y_to_xs [#1]
  • [00:10.009]foocorp.tasks.add_y_to_xs发送.ID - >#2
  • [00:20.024]foocorp.tasks.add_y_to_xs发送.ID - >#3
  • [00:26.747] 收到的任务:foocorp.tasks.add_y_to_xs [#2]
  • [00:26.748] TaskPool:应用#2
  • [00:26.752] 收到的任务:foocorp.tasks.add_y_to_xs [#3]
  • [00:26.769] 接受的任务:foocorp.tasks.add_y_to_xs [#2] pid:26528
  • [00:26.775] 任务foocorp.tasks.add_y_to_xs [#2]成功完成0.0197986490093s:无
  • [00:26.806] TaskPool:应用#1
  • [00:26.836] TaskPool:应用#3
  • [01:30.020] 接受的任务:foocorp.tasks.add_y_to_xs [#1] pid:26526
  • [01:30.053] 接受的任务:foocorp.tasks.add_y_to_xs [#3] pid:26529
  • [01:30.055] foocorp.tasks.add_y_to_xs [#1]:为X id添加Y#9725
  • [01:30.070] foocorp.tasks.add_y_to_xs [#3]:为X id添加Y#9725
  • [01:30.074] 任务foocorp.tasks.add_y_to_xs [#1]成功完成0.0594762689434s:无
  • [01:30.087] 任务foocorp.tasks.add_y_to_xs [#3]成功完成0.0352867960464s:无

我们目前正在使用带有RabbitMQ的Celery 3.1.4作为传输.

编辑丹,这是我想出的:

丹,这是我最终使用的:

from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager


def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
    lock_fn = (func.pg_advisory_xact_lock_shared
               if shared else
               func.pg_advisory_xact_lock)
    if timeout:
        conn.execute(text('SET statement_timeout TO :timeout'),
                     timeout=timeout)
    try:
        conn.execute(select([lock_fn(lock_id)]))
    except DBAPIError:
        return False
    return True


def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
    lock_fn = (func.pg_try_advisory_xact_lock_shared
               if shared else
               func.pg_try_advisory_xact_lock)
    return conn.execute(select([lock_fn(lock_id)])).scalar()


class DatabaseLockFailed(Exception):
    pass


@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
    """
    Context manager which acquires a PSQL advisory transaction lock with a
    specified name.
    """
    lock_id = hash(name)

    with engine.begin() as conn, conn.begin():
        if block:
            locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
                                                  timeout)
        else:
            locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
        if not locked:
            raise DatabaseLockFailed()
        yield
Run Code Online (Sandbox Code Playgroud)

和芹菜任务装饰器(仅用于定期任务):

from functools import wraps
from preo.extensions import db


def locked(name=None, block=True, timeout='1s'):
    """
    Using a PostgreSQL advisory transaction lock, only runs this task if the
    lock is available. Otherwise logs a message and returns `None`.
    """
    def with_task(fn):
        lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)

        @wraps(fn)
        def f(*args, **kwargs):
            try:
                with db_lock(db.engine, name=lock_id, block=block,
                             timeout=timeout):
                    return fn(*args, **kwargs)
            except DatabaseLockFailed:
                logger.error('Failed to get lock.')
                return None
        return f
    return with_task
Run Code Online (Sandbox Code Playgroud)

小智 9

def skip_if_running(f):
    u"""
    ?? ????????? ?????? ? ?????? ?? ??????????? ???? ??? ??? ? ?????????
    """
    task_name = u'%s.%s' % (f.__module__, f.__name__)
    mylog.info(u'skip decorator for %s' % task_name)
    @wraps(f)
    def fun(self, *args, **kwargs):
        try:
            uargs = unicode(args)
            ukwargs = unicode(kwargs)

            i = clr_app.control.inspect()
            workers = i.active()
            for worker, tasks in workers.items():
                for task in tasks:
                    if task_name == task['name'] and uargs == task['args'] and ukwargs == task['kwargs'] and self.request.id != task['id']:
                        mylog.warning(u'task %s (%s, %s) is started on %s, skip current' % (task_name, uargs, ukwargs, worker))
                        return None
        except Exception as e:
            mylog.error(e)
        return f(*args, **kwargs)
    return fun

@clr_app.task(bind=True)
@skip_if_running
def test_single_task(arg):
    pass
Run Code Online (Sandbox Code Playgroud)


sbe*_*rry 6

唯一的方法是自己实施锁定策略:

请阅读此处的部分以供参考.

与cron一样,如果第一个任务在下一个任务之前没有完成,任务可能会重叠.如果这是一个问题,您应该使用锁定策略来确保一次只能运行一个实例(例如,参见确保任务一次只执行一个).

  • @erydo,您愿意分享那个装饰者吗?我正在尝试做几乎相同的事情,并遇到一些奇怪的同步问题。 (2认同)