Celery:如何以可靠且可测试的方式获取队列大小

dav*_*ode 8 python redis celery

我正在失去理智,试图找到一种可靠且可测试的方法来获取给定 Celery 队列中包含的任务数量。

我已经阅读了这两个相关的讨论:

但我无法使用这些线程中描述的方法解决我的问题。

我使用 Redis 作为后端,但我希望有一个独立于后端且灵活的解决方案,特别是对于测试。

这是我目前的情况:我定义了一个EnhancedCelery类,它继承Celery并添加了几个方法,特别get_queue_size()是我正在尝试正确实现/测试的方法。

以下是我的测试用例中的代码:

celery_test_app = EnhancedCelery(__name__)

# this is needed to avoid exception for ping command
# which is automatically triggered by the worker once started
celery_test_app.loader.import_module('celery.contrib.testing.tasks')

# in memory backend
celery_test_app.conf.broker_url = 'memory://'
celery_test_app.conf.result_backend = 'cache+memory://'

# We have to setup queues manually, 
# since it seems that auto queue creation doesn't work in tests :(
celery_test_app.conf.task_create_missing_queues = False
celery_test_app.conf.task_default_queue = 'default'
celery_test_app.conf.task_queues = (
    Queue('default', routing_key='task.#'),
    Queue('queue_1', routing_key='q1'),
    Queue('queue_2', routing_key='q2'),
    Queue('queue_3', routing_key='q3'),
)
celery_test_app.conf.task_default_exchange = 'tasks'
celery_test_app.conf.task_default_exchange_type = 'topic'
celery_test_app.conf.task_default_routing_key = 'task.default'
celery_test_app.conf.task_routes = {
    'sample_task': {
        'queue': 'default',
        'routing_key': 'task.default',
    },
    'sample_task_in_queue_1': {
        'queue': 'queue_1',
        'routing_key': 'q1',
    },
    'sample_task_in_queue_2': {
        'queue': 'queue_2',
        'routing_key': 'q2',
    },
    'sample_task_in_queue_3': {
        'queue': 'queue_3',
        'routing_key': 'q3',
    },
}


@celery_test_app.task()
def sample_task():
    return 'sample_task_result'


@celery_test_app.task(queue='queue_1')
def sample_task_in_queue_1():
    return 'sample_task_in_queue_1_result'


@celery_test_app.task(queue='queue_2')
def sample_task_in_queue_2():
    return 'sample_task_in_queue_2_result'


@celery_test_app.task(queue='queue_3')
def sample_task_in_queue_3():
    return 'sample_task_in_queue_3_result'


class EnhancedCeleryTest(TestCase):
    def test_get_queue_size_returns_expected_value(self):
        def add_task(task):
            task.apply_async()

        with start_worker(celery_test_app):
            for _ in range(7):
                add_task(sample_task_in_queue_1)

            for _ in range(4):
                add_task(sample_task_in_queue_2)

            for _ in range(2):
                add_task(sample_task_in_queue_3)

            self.assertEqual(celery_test_app.get_queue_size('queue_1'), 7)
            self.assertEqual(celery_test_app.get_queue_size('queue_2'), 4)
            self.assertEqual(celery_test_app.get_queue_size('queue_3'), 2)

Run Code Online (Sandbox Code Playgroud)

以下是我的尝试get_queue_size()

  1. 这始终返回零 ( jobs == 0):

    def get_queue_size(self, queue_name: str) -> Optional[int]:
        with self.connection_or_acquire() as connection:
            channel = connection.default_channel
    
            try:
                name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
                return jobs
            except (ChannelError, NotFound):
                pass
    
    Run Code Online (Sandbox Code Playgroud)
  2. 这也总是返回零:

    def get_queue_size(self, queue_name: str) -> Optional[int]:
        inspection = self.control.inspect()
    
        return inspection.active() # zero!
    
        # or:
        return inspection.scheduled() # zero!
    
        # or:
        return inspection.reserved() # zero!
    
    Run Code Online (Sandbox Code Playgroud)
  3. 这是通过返回每个队列的预期数量来实现的,但仅限于测试环境,因为channel.queues使用 redis 后端时该属性不存在:

    def get_queue_size(self, queue_name: str) -> Optional[int]:
        with self.connection_or_acquire() as connection:
            channel = connection.default_channel
    
            if hasattr(channel, 'queues'):
                queue = channel.queues.get(queue_name)
    
                if queue is not None:
                    return queue.unfinished_tasks
    
    Run Code Online (Sandbox Code Playgroud)

Dej*_*kic 5

以我的愚见,您提到的解决方案都不完全正确。正如您已经提到的,这是特定于后端的,因此您必须包装 Celery 支持的所有后端的处理程序,以提供与后端无关的队列检查。在 Redis 情况下,您必须直接连接到 Redis 和 LLEN 要检查的队列。对于 RabbitMQ,您可以通过完全不同的方式找到此信息。与 SQS 的情况相同...

这已经在 Celery 线程中的队列中检索任务列表中讨论过......

最后,Celery 不提供开箱即用的功能是有原因的——我认为这些信息是无用的。当您获取队列中的内容时,它可能已经空了!

如果您想监控队列的情况,我建议您使用另一种方法。-编写你自己的实时监视器。该示例仅捕获任务失败的事件,但您应该能够轻松修改它以捕获您关心的所有事件,并收集有关这些任务的数据(队列、时间、执行的主机等)。显然这是一个在更严肃的项目中如何完成的示例。