如何在 celery 任务中使用 asyncio 和 aioredis 锁?

SH2*_*H21 4 celery celery-task python-3.x python-asyncio aioredis

目标:

  1. 可以运行 asyncio 协程。
  2. 纠正异常和任务重试时的 celery 行为。
  3. 可以使用 aioredis 锁。

那么,如何正确运行异步任务来达到目标​​呢?

这是什么RuntimeError: await wasn't used with future(如下),我该如何修复它?


我已经尝试过:

1. 阿斯吉里夫

async_to_sync(来自 asgiref https://pypi.org/project/asgiref/)。

此选项可以运行 asyncio 协程,但重试功能不起作用。

2. 芹菜池异步

https://pypi.org/project/celery-pool-asyncio/

与 asgiref 中的问题相同。(此选项可以运行 asyncio 协程,但重试功能不起作用。)

3.编写自己的异步同步装饰器

我已经尝试创建自己的装饰器,例如运行协程线程安全(asyncio.run_coroutine_threadsafe)的 async_to_sync ,但我的行为如上所述。

4. 异步模块

我还尝试asyncio.run()asyncio.get_event_loop().run_until_complete()(和self.retry(...))内部芹菜任务。这运行良好,任务运行,重试有效,但是协程执行不正确 - 在async函数内部我无法使用 aioredis。

实施注意事项

  • 启动芹菜命令celery -A celery_test.celery_app worker -l info -n worker1 -P gevent --concurrency=10 --without-gossip --without-mingle
  • 芹菜应用程序
transport = f"redis://localhost/9"
celery_app = Celery("worker", broker=transport, backend=transport,
                    include=['tasks'])

celery_app.conf.broker_transport_options = {
    'visibility_timeout': 60 * 60 * 24,
    'fanout_prefix': True,
    'fanout_patterns': True
}
Run Code Online (Sandbox Code Playgroud)
  • 实用程序
@contextmanager
def temp_asyncio_loop():
    # asyncio.get_event_loop() automatically creates event loop only for main thread
    try:
        prev_loop = asyncio.get_event_loop()
    except RuntimeError:
        prev_loop = None
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        yield loop
    finally:
        loop.stop()
        loop.close()
        del loop
        asyncio.set_event_loop(prev_loop)


def with_temp_asyncio_loop(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        with temp_asyncio_loop() as t_loop:
            return f(*args, loop=t_loop, **kwargs)

    return wrapper


def await_(coro):
    return asyncio.get_event_loop().run_until_complete(coro)
Run Code Online (Sandbox Code Playgroud)
  • 任务

@celery_app.task(bind=True, max_retries=30, default_retry_delay=0)
@with_temp_asyncio_loop
def debug(self, **kwargs):
    try:
        await_(debug_async())
    except Exception as exc:
        self.retry(exc=exc)


async def debug_async():
    async with RedisLock(f'redis_lock_{datetime.now()}'):
        pass
Run Code Online (Sandbox Code Playgroud)
  • redis锁

class RedisLockException(Exception):
    pass


class RedisLock(AsyncContextManager):
    """
    Redis Lock class

    :param lock_id: string (unique key)
    :param value: dummy value
    :param expire: int (time in seconds that key will storing)
    :param expire_on_delete: int (time in seconds, set pause before deleting)

        Usage:
            try:
                with RedisLock('123_lock', 5 * 60):
                    # do something
            except RedisLockException:
    """

    def __init__(self, lock_id: str, value='1', expire: int = 4, expire_on_delete: int = None):
        self.lock_id = lock_id
        self.expire = expire
        self.value = value
        self.expire_on_delete = expire_on_delete

    async def acquire_lock(self):
        return await redis.setnx(self.lock_id, self.value)

    async def release_lock(self):
        if self.expire_on_delete is None:
            return await redis.delete(self.lock_id)
        else:
            await redis.expire(self.lock_id, self.expire_on_delete)

    async def __aenter__(self, *args, **kwargs):
        if not await self.acquire_lock():
            raise RedisLockException({
                'redis_lock': 'The process: {} still run, try again later'.format(await redis.get(self.lock_id))
            })
        await redis.expire(self.lock_id, self.expire)

    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.release_lock()
Run Code Online (Sandbox Code Playgroud)

在我的 Windows 机器上,await redis.setnx(...)阻止了 celery 工作程序,它停止生成日志并且Ctrl+C无法工作。

在 docker 容器内,我收到一个错误。有部分回溯:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 854, in read_response
    response = await self._parser.read_response()
  File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 366, in read_response
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
aioredis.exceptions.ConnectionError: Connection closed by server.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py", line 451, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py", line 734, in __protected_call__
    return self.run(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/celery/app/autoretry.py", line 54, in run
    ret = task.retry(exc=exc, **retry_kwargs)
  File "/usr/local/lib/python3.9/site-packages/celery/app/task.py", line 717, in retry
    raise_with_context(exc)
  File "/usr/local/lib/python3.9/site-packages/celery/app/autoretry.py", line 34, in run
    return task._orig_run(*args, **kwargs)
  File "/app/celery_tasks/tasks.py", line 69, in wrapper
    return f(*args, **kwargs) # <--- inside with_temp_asyncio_loop from utils
  ...
  File "/usr/local/lib/python3.9/contextlib.py", line 575, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/app/db/redis.py", line 50, in __aenter__
    if not await self.acquire_lock():
  File "/app/db/redis.py", line 41, in acquire_lock
    return await redis.setnx(self.lock_id, self.value)
  File "/usr/local/lib/python3.9/site-packages/aioredis/client.py", line 1064, in execute_command
    return await self.parse_response(conn, command_name, **options)
  File "/usr/local/lib/python3.9/site-packages/aioredis/client.py", line 1080, in parse_response
    response = await connection.read_response()
  File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 859, in read_response
    await self.disconnect()
  File "/usr/local/lib/python3.9/site-packages/aioredis/connection.py", line 762, in disconnect
    await self._writer.wait_closed()
  File "/usr/local/lib/python3.9/asyncio/streams.py", line 359, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: await wasn't used with future
Run Code Online (Sandbox Code Playgroud)
  • 库版本
celery==5.2.1
aioredis==2.0.0
Run Code Online (Sandbox Code Playgroud)

Ser*_*eat 10

使用solo池,然后创建一个运行任务函数 asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))并使任务异步的装饰器

def sync(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
    return wrapper

@celery_app.task()
@sync
async def task():
   ...
Run Code Online (Sandbox Code Playgroud)