从 celery 禁用rabbitmq中的全局QoS

San*_*sam 5 rabbitmq celery quorum

我使用三台 aws ec2 服务器创建了一个 RabbitMQ 3 节点集群。我正在尝试访问法定人数使用 celery 创建的当我连接时出现错误

raise error_for_code(reply_code, reply_text,
amqp.exceptions.AMQPNotImplementedError: Basic.consume: (540) NOT_IMPLEMENTED - queue 'Replica_que' in vhost '/' does not support global qos
Run Code Online (Sandbox Code Playgroud)

我想如果我禁用全局服务质量它会起作用,但我找不到在哪里可以做到这一点。如何禁用 celery 中的全局 qos?

我的芹菜代码

from celery import Celery
from time import sleep
import kombu


broker_uri=['amqp://xxxx:5672/', 'amqp://xxxx:5672/','amqp://xxx:5672/']
backend_uri="mongodb+srv://xxxxx"

app = Celery('TestApp', broker=broker_uri,backend=backend_uri)

app.config_from_object('celeryconfig')
app.conf.task_default_exchange='Replica_que'
app.conf.task_default_routing_key='Replica'

@app.task
def reverse(text):
    sleep(10)
    return text[:-1]

Run Code Online (Sandbox Code Playgroud)

和配置代码

from kombu import Queue

task_queues = [Queue(name="Replica_que", queue_arguments={"x-queue-type": "quorum"})]

task_routes = {
    'tasks.add': 'Replica_que',
}
Run Code Online (Sandbox Code Playgroud)

San*_*sam 2

这可以通过添加 celeryconfig.py 文件来实现,

from kombu import Queue

task_queues = [Queue(name="Replica_que", queue_arguments={"x-queue-type": "quorum"})]

task_routes = {
    'tasks.add': 'Replica_que',
}
Run Code Online (Sandbox Code Playgroud)

并创建自定义 QoS 类:https ://github.com/celery/celery/issues/6067

所以我添加了 QoS 等级

class NoChannelGlobalQoS(bootsteps.StartStopStep):
    requires = {'celery.worker.consumer.tasks:Tasks'}

    def start(self, c):
        qos_global = False

        c.connection.default_channel.basic_qos(0, c.initial_prefetch_count, qos_global)
        def set_prefetch_count(prefetch_count):
            return c.task_consumer.qos(
                prefetch_count=prefetch_count,
                apply_global=qos_global,
            )
        c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)

app.steps['consumer'].add(NoChannelGlobalQoS)
Run Code Online (Sandbox Code Playgroud)

目前,这是 celery 中与仲裁队列相关的问题,但这是有效的。