我有一个使用来自 RabbitMQ 队列的消息的服务(发布到队列是通过主题交换完成的)。假设服务理论上会失败并失去其状态,备份所有消息以进行灾难恢复的可能性将派上用场。
想到的第一个想法是为主题交换添加另一个绑定,以便消息也发布到另一个队列,并创建一个自定义服务来备份将在该队列上侦听的消息。但这听起来很像车轮的潜在改造。有没有更简单的方法来使用 RabbitMQ(插件/现有服务/等)来做到这一点?
我是 RabbitMQ 的新手,我刚刚浏览了 Rabbitmq 文档(路由)。我在 Exchange 与路由键之间很困惑。我的要求是,我想动态创建多个队列。请参考下图。

前任。让我们说如果生产者为消费者 c3 创建消息,那么它应该转到 Exchange 并仅路由到队列 3 并仅由 C3 消费。目前我只需要 3 个队列,将来这个计数可能会增加。那么如何处理这种情况呢。
注意:我参考这个博客交流
我已经将 Spring hibernate 与 Rabbitmq 一起使用了。下面的代码显示了 Rabbit MQ 侦听器配置文件。
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<rabbit:connection-factory id="connectionFactory" host="xx.xx.10.181" username="admin" password="admin" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- Create Queues -->
<rabbit:queue id="q1" name="q1.queue" durable="true" />
<rabbit:queue id="q2" name="q2.queue" durable="true" />
<rabbit:queue id="q3" name="q3.queue" durable="true" />
<!--create Exchange here -->
<rabbit:direct-exchange id="myExchange" name="MY Exchange">
<rabbit:bindings>
<rabbit:binding queue="q1" …Run Code Online (Sandbox Code Playgroud) 我已经开始使用 RabbitMQ,我的用例非常简单 - 生产者将消息放入队列中以供消费者处理。每条消息最多由一个消费者处理,并且消息根据队列名称从生产者定向到消费者。
Direct交换似乎非常适合这一点,default交换就是direct交换。
是否有任何原因(性能、管理、许可等)不使用该default交易所并创建自己的交易所?例如,我将使用高可用性队列(https://www.rabbitmq.com/ha.html),并且不确定如果所有 HA 队列都在交换机上是否会对集群产生负面default影响而不是不同的交易所?
我使用 Bunny (Ruby) 发布 RabbitMQ 消息,如下所示:
x.publish("Message !"+n.to_s, :routing_key => 'mychannel')
Run Code Online (Sandbox Code Playgroud)
并像这样订阅:
ch = conn.create_channel
x = ch.topic('fling',durable: true)
q = ch.queue("")
q.bind(x, :routing_key => 'mychannel')
puts "Waiting for messages."
q.subscribe( :block => true) do |delivery_info, properties, body|
puts " [x] Received #{body}, message properties are #{properties.inspect}"
Run Code Online (Sandbox Code Playgroud)
一旦我启动订阅者,它就会立即接收发送的任何消息。但是,如果我在没有启动订阅者的情况下发送消息,则当我启动订阅者时不会收到消息(无论发送者是否仍在推送消息)。
当没有订阅者在监听时,是否可以返回队列并接收过去发送的消息?
我对 RabbitMQ 还很陌生,我必须实现一个 .NET 客户端,该客户端需要将消息发布到第三方托管的 RabbitMQ 服务器上的 Exchange 实体。
与服务器的连接似乎运行良好,但是当我想声明我必须使用的 Exchange 通道时,情况变得更糟。
using (var connection = connectionFactory.CreateConnection())
{
IModel model = connection.CreateModel();
model.ExchangeDeclare(exchangeName, ExchangeType.Direct);
// ... the rest of the code is never reached
}
Run Code Online (Sandbox Code Playgroud)
通过这些行,我从服务器收到一条异常消息
The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=403, text="ACCESS_REFUSED - access to exchange ... in vhost ... refused for user ...", classId=40, methodId=10, cause=
"Connection Closed, error code 403 (ACCESS_REFUSED)"
Run Code Online (Sandbox Code Playgroud)
这是什么意思?这是否意味着为我提供连接到 AMQP 服务器的凭据的第三方尚未授予 Exchange“exchangeName”足够的权限?或者我在客户端做错了什么?我真的需要打电话吗ExchangeDeclare?
谢谢您的帮助
我与来自该交换的 vhost、用户等进行交换,我绑定了不同的队列。交换和队列在同一个虚拟主机中。现在我想为不同的队列创建一个不同的虚拟主机,但我无法从以前的交换中绑定这个新队列,因为它在不同的虚拟主机中。
什么是最好的解决方案?谢谢
我有几个任务作为服务运行。为了启动工作人员,我使用:
def SvcDoRun(self):
logging.info('Starting {name} service ...'.format(name=self._svc_name_))
os.chdir(INSTDIR) # so that proj worker can be found
logging.info('cwd: ' + os.getcwd())
self.ReportServiceStatus(win32service.SERVICE_RUNNING)
command = '{celery_path} -A {proj_dir} worker -f "{log_path}" -l info --hostname=theService@%h'.format(
celery_path='celery',
proj_dir=PROJECTDIR,
log_path=os.path.join(INSTDIR,'celery.log'))
logging.info('command: ' + command)
args = shlex.split(command)
proc = subprocess.Popen(args)
logging.info('pid: {pid}'.format(pid=proc.pid))
self.timeout = 5000
while True:
rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout)
if rc == win32event.WAIT_OBJECT_0:
print 'Terminate'
# stop signal encountered
PROCESS_TERMINATE = 1
handle = win32api.OpenProcess(PROCESS_TERMINATE, False, proc.pid)
win32api.TerminateProcess(handle, -1)
win32api.CloseHandle(handle)
break …Run Code Online (Sandbox Code Playgroud) 我一直在阅读有关 AMQP 消息传递确认的原理。(https://www.rabbitmq.com/confirms.html)。文章确实很有帮助,写得很好,但有关消费者认知的一件事确实令人困惑,以下是引用:
使用自动确认模式时需要考虑的另一件事是消费者过载。
消费者超载?消息队列由代理处理并保存在 RAM 中(如果我理解正确的话)。这是关于什么过载?消费者是否有某种第二队列?该文章的另一部分更令人困惑:
因此,消费者可能会因交付速度而不知所措,可能会在内存中积累积压并耗尽堆或让操作系统终止其进程。
什么积压?这一切是如何协同工作的?消费者完成哪部分工作(当然除了消费消息和处理消息)?我认为代理正在保持队列活动并转发消息,但现在我正在阅读一些神秘的积压和消费者过载。这真的很令人困惑,有人可以解释一下或者至少指出我的好来源吗?
我在项目中使用 RabbitMQ,想知道是否应该使用具有多个路由键的单个交换器还是使用多个交换器?哪个会更有效率?
例如,如果我使用带有路由密钥 A、B、C 的单个交换机 E,并且消费者连接到该交换机 E 并使用 A、B、C 路由密钥获取数据。另一种选择是我应该将其发送到没有路由密钥的交换机 A、交换机 B、交换机 C,并且消费者可以连接到每个交换机以获取所需的数据。
python 库 pika 的示例(我当前正在使用):
channel.basic_publish(exchange='E',
routing_key='A',
body=data)
channel.basic_publish(exchange='E',
routing_key='B',
body=data)
channel.basic_publish(exchange='E',
routing_key='C',
body=data)
OR
channel.basic_publish(exchange='A',
routing_key='',
body=data)
channel.basic_publish(exchange='B',
routing_key='',
body=data)
channel.basic_publish(exchange='C',
routing_key='',
body=data)
Run Code Online (Sandbox Code Playgroud) 我正在编写一个集成测试,其中我使用 - 创建一个rabbitMQ容器
docker run -d --hostname localhost -p 5672:5672 --name rabbit-tox -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq:3
Run Code Online (Sandbox Code Playgroud)
为了测试我是否可以从测试中连接到rabbitMQ,我创建了这个测试并且它可以发送数据 -
def test_rmq(self):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
Run Code Online (Sandbox Code Playgroud)
现在我想使用rabbitMQ容器作为celery后端,这是我正在使用的代码 -
from celery import Celery
broker_url = 'amqp://guest:guest@localhost:5672//'
app = Celery('test', broker=broker_url, backend='amqp')
from celery.contrib.testing.worker import start_worker
from swordfish_app import tasks
# Testing the consumer logic
class ServiceAccountCeleryTestCase(TransactionTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.celery_worker = start_worker(app)
cls.celery_worker.__enter__()
@classmethod
def tearDownClass(cls):
super().tearDownClass() …Run Code Online (Sandbox Code Playgroud)