San*_*sam 5 rabbitmq celery quorum
我使用三台 aws ec2 服务器创建了一个 RabbitMQ 3 节点集群。我正在尝试访问法定人数使用 celery 创建的当我连接时出现错误
raise error_for_code(reply_code, reply_text,
amqp.exceptions.AMQPNotImplementedError: Basic.consume: (540) NOT_IMPLEMENTED - queue 'Replica_que' in vhost '/' does not support global qos
Run Code Online (Sandbox Code Playgroud)
我想如果我禁用全局服务质量它会起作用,但我找不到在哪里可以做到这一点。如何禁用 celery 中的全局 qos?
我的芹菜代码
from celery import Celery
from time import sleep
import kombu
broker_uri=['amqp://xxxx:5672/', 'amqp://xxxx:5672/','amqp://xxx:5672/']
backend_uri="mongodb+srv://xxxxx"
app = Celery('TestApp', broker=broker_uri,backend=backend_uri)
app.config_from_object('celeryconfig')
app.conf.task_default_exchange='Replica_que'
app.conf.task_default_routing_key='Replica'
@app.task
def reverse(text):
sleep(10)
return text[:-1]
Run Code Online (Sandbox Code Playgroud)
和配置代码
from kombu import Queue
task_queues = [Queue(name="Replica_que", queue_arguments={"x-queue-type": "quorum"})]
task_routes = {
'tasks.add': 'Replica_que',
}
Run Code Online (Sandbox Code Playgroud)
这可以通过添加 celeryconfig.py 文件来实现,
from kombu import Queue
task_queues = [Queue(name="Replica_que", queue_arguments={"x-queue-type": "quorum"})]
task_routes = {
'tasks.add': 'Replica_que',
}
Run Code Online (Sandbox Code Playgroud)
并创建自定义 QoS 类:https ://github.com/celery/celery/issues/6067
所以我添加了 QoS 等级
class NoChannelGlobalQoS(bootsteps.StartStopStep):
requires = {'celery.worker.consumer.tasks:Tasks'}
def start(self, c):
qos_global = False
c.connection.default_channel.basic_qos(0, c.initial_prefetch_count, qos_global)
def set_prefetch_count(prefetch_count):
return c.task_consumer.qos(
prefetch_count=prefetch_count,
apply_global=qos_global,
)
c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)
app.steps['consumer'].add(NoChannelGlobalQoS)
Run Code Online (Sandbox Code Playgroud)
目前,这是 celery 中与仲裁队列相关的问题,但这是有效的。
| 归档时间: |
|
| 查看次数: |
1135 次 |
| 最近记录: |