标签: rabbitmq-exchange

RabbitMQ 备份特定队列中的消息

我有一个使用来自 RabbitMQ 队列的消息的服务(发布到队列是通过主题交换完成的)。假设服务理论上会失败并失去其状态,备份所有消息以进行灾难恢复的可能性将派上用场。

想到的第一个想法是为主题交换添加另一个绑定,以便消息也发布到另一个队列,并创建一个自定义服务来备份将在该队列上侦听的消息。但这听起来很像车轮的潜在改造。有没有更简单的方法来使用 RabbitMQ(插件/现有服务/等)来做到这一点?

rabbitmq rabbitmq-exchange

5
推荐指数
1
解决办法
3067
查看次数

RabbitMQ:在直接交换中创建动态队列

我是 RabbitMQ 的新手,我刚刚浏览了 Rabbitmq 文档(路由)。我在 Exchange 与路由键之间很困惑。我的要求是,我想动态创建多个队列。请参考下图。

 生产者将作业发送到 Exchange 并将其转发到相应的队列

前任。让我们说如果生产者为消费者 c3 创建消息,那么它应该转到 Exchange 并仅路由到队列 3 并仅由 C3 消费。目前我只需要 3 个队列,将来这个计数可能会增加。那么如何处理这种情况呢。

注意:我参考这个博客交流

我已经将 Spring hibernate 与 Rabbitmq 一起使用了。下面的代码显示了 Rabbit MQ 侦听器配置文件。

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit  
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

<rabbit:connection-factory id="connectionFactory" host="xx.xx.10.181" username="admin" password="admin" />
<rabbit:admin connection-factory="connectionFactory" />

<!--  Create Queues -->
<rabbit:queue id="q1" name="q1.queue" durable="true" />
<rabbit:queue id="q2" name="q2.queue" durable="true"  />
<rabbit:queue id="q3" name="q3.queue" durable="true"  />

<!--create Exchange here -->
<rabbit:direct-exchange id="myExchange" name="MY Exchange">
    <rabbit:bindings>
         <rabbit:binding queue="q1" …
Run Code Online (Sandbox Code Playgroud)

java message-queue rabbitmq spring-amqp rabbitmq-exchange

5
推荐指数
0
解决办法
1581
查看次数

不使用 RabbitMQ 上的默认交换的原因?

我已经开始使用 RabbitMQ,我的用例非常简单 - 生产者将消息放入队列中以供消费者处理。每条消息最多由一个消费者处理,并且消息根据队列名称从生产者定向到消费者。

Direct交换似乎非常适合这一点,default交换就是direct交换。

是否有任何原因(性能、管理、许可等)不使用该default交易所并创建自己的交易所?例如,我将使用高可用性队列(https://www.rabbitmq.com/ha.html),并且不确定如果所有 HA 队列都在交换机上是否会对集群产生负面default影响而不是不同的交易所?

rabbitmq rabbitmq-exchange

5
推荐指数
1
解决办法
1242
查看次数

如何从 RabbitMQ 获取旧消息?

我使用 Bunny (Ruby) 发布 RabbitMQ 消息,如下所示:

x.publish("Message !"+n.to_s, :routing_key => 'mychannel')
Run Code Online (Sandbox Code Playgroud)

并像这样订阅:

    ch   = conn.create_channel
x    = ch.topic('fling',durable: true)
q    = ch.queue("")
q.bind(x, :routing_key => 'mychannel')


puts "Waiting for messages."
q.subscribe( :block => true) do |delivery_info, properties, body|
puts " [x] Received #{body}, message properties are #{properties.inspect}"
Run Code Online (Sandbox Code Playgroud)

一旦我启动订阅者,它就会立即接收发送的任何消息。但是,如果我在没有启动订阅者的情况下发送消息,则当我启动订阅者时不会收到消息(无论发送者是否仍在推送消息)。

当没有订阅者在监听时,是否可以返回队列并接收过去发送的消息?

rabbitmq bunny rabbitmq-exchange

5
推荐指数
1
解决办法
7506
查看次数

RabbitMQ - ExchangeDeclare 在 .NET 客户端中被拒绝并显示 ACCESS_REFUSED

我对 RabbitMQ 还很陌生,我必须实现一个 .NET 客户端,该客户端需要将消息发布到第三方托管的 RabbitMQ 服务器上的 Exchange 实体。

与服务器的连接似乎运行良好,但是当我想声明我必须使用的 Exchange 通道时,情况变得更糟。

using (var connection = connectionFactory.CreateConnection())
{
    IModel model = connection.CreateModel();
    model.ExchangeDeclare(exchangeName, ExchangeType.Direct);
    // ... the rest of the code is never reached
}
Run Code Online (Sandbox Code Playgroud)

通过这些行,我从服务器收到一条异常消息

The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=403, text="ACCESS_REFUSED - access to exchange ... in vhost ... refused for user ...", classId=40, methodId=10, cause=
 "Connection Closed, error code 403 (ACCESS_REFUSED)"
Run Code Online (Sandbox Code Playgroud)

这是什么意思?这是否意味着为我提供连接到 AMQP 服务器的凭据的第三方尚未授予 Exchange“exchangeName”足够的权限?或者我在客户端做错了什么?我真的需要打电话吗ExchangeDeclare

谢谢您的帮助

.net c# rabbitmq rabbitmq-exchange

5
推荐指数
1
解决办法
2550
查看次数

我可以绑定来自不同虚拟主机的队列吗?

我与来自该交换的 vhost、用户等进行交换,我绑定了不同的队列。交换和队列在同一个虚拟主机中。现在我想为不同的队列创建一个不同的虚拟主机,但我无法从以前的交换中绑定这个新队列,因为它在不同的虚拟主机中。

什么是最好的解决方案?谢谢

rabbitmq vhosts rabbitmq-exchange

5
推荐指数
1
解决办法
3439
查看次数

客户端意外关闭 TCP 连接

我有几个任务作为服务运行。为了启动工作人员,我使用:

def SvcDoRun(self):
    logging.info('Starting {name} service ...'.format(name=self._svc_name_))
    os.chdir(INSTDIR) # so that proj worker can be found
    logging.info('cwd: ' + os.getcwd())

    self.ReportServiceStatus(win32service.SERVICE_RUNNING)

    command = '{celery_path} -A {proj_dir} worker -f "{log_path}" -l info --hostname=theService@%h'.format(
        celery_path='celery',
        proj_dir=PROJECTDIR,
        log_path=os.path.join(INSTDIR,'celery.log'))

    logging.info('command: ' + command)

    args = shlex.split(command)
    proc = subprocess.Popen(args)

    logging.info('pid: {pid}'.format(pid=proc.pid))

    self.timeout = 5000

    while True:
        rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout)
        if rc == win32event.WAIT_OBJECT_0:
            print 'Terminate'
            # stop signal encountered
            PROCESS_TERMINATE = 1
            handle = win32api.OpenProcess(PROCESS_TERMINATE, False, proc.pid)
            win32api.TerminateProcess(handle, -1)
            win32api.CloseHandle(handle)
            break …
Run Code Online (Sandbox Code Playgroud)

python rabbitmq celery rabbitmq-exchange

5
推荐指数
0
解决办法
1万
查看次数

RabbitMQ 消费者过载

我一直在阅读有关 AMQP 消息传递确认的原理。(https://www.rabbitmq.com/confirms.html)。文章确实很有帮助,写得很好,但有关消费者认知的一件事确实令人困惑,以下是引用:

使用自动确认模式时需要考虑的另一件事是消费者过载

消费者超载?消息队列由代理处理并保存在 RAM 中(如果我理解正确的话)。这是关于什么过载?消费者是否有某种第二队列?该文章的另一部分更令人困惑:

因此,消费者可能会因交付速度而不知所措,可能会在内存中积累积压并耗尽堆或让操作系统终止其进程。

什么积压?这一切是如何协同工作的?消费者完成哪部分工作(当然除了消费消息和处理消息)?我认为代理正在保持队列活动并转发消息,但现在我正在阅读一些神秘的积压和消费者过载。这真的很令人困惑,有人可以解释一下或者至少指出我的好来源吗?

producer-consumer amqp rabbitmq rabbitmq-exchange

5
推荐指数
1
解决办法
2742
查看次数

RabbitMQ 中具有多个路由键的一个交换或多个高效交换

我在项目中使用 RabbitMQ,想知道是否应该使用具有多个路由键的单个交换器还是使用多个交换器?哪个会更有效率?

例如,如果我使用带有路由密钥 A、B、C 的单个交换机 E,并且消费者连接到该交换机 E 并使用 A、B、C 路由密钥获取数据。另一种选择是我应该将其发送到没有路由密钥的交换机 A、交换机 B、交换机 C,并且消费者可以连接到每个交换机以获取所需的数据。

python 库 pika 的示例(我当前正在使用):

channel.basic_publish(exchange='E',
                      routing_key='A',
                      body=data)  
channel.basic_publish(exchange='E',
                      routing_key='B',
                      body=data)  
channel.basic_publish(exchange='E',
                      routing_key='C',
                      body=data)

OR

channel.basic_publish(exchange='A',
                      routing_key='',
                      body=data)  
channel.basic_publish(exchange='B',
                      routing_key='',
                      body=data)  
channel.basic_publish(exchange='C',
                      routing_key='',
                      body=data)
Run Code Online (Sandbox Code Playgroud)

rabbitmq pika rabbitmq-exchange

5
推荐指数
1
解决办法
2670
查看次数

如何在 Django 单元测试中使用rabbitMQ 作为 celery 的代理

我正在编写一个集成测试,其中我使用 - 创建一个rabbitMQ容器

docker run -d --hostname localhost -p 5672:5672 --name rabbit-tox -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq:3
Run Code Online (Sandbox Code Playgroud)

为了测试我是否可以从测试中连接到rabbitMQ,我创建了这个测试并且它可以发送数据 -

def test_rmq(self):
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672))
        channel = connection.channel()

        channel.queue_declare(queue='hello')
        channel.basic_publish(exchange='',
                              routing_key='hello',
                              body='Hello World!')
        print(" [x] Sent 'Hello World!'")
        connection.close()
Run Code Online (Sandbox Code Playgroud)

现在我想使用rabbitMQ容器作为celery后端,这是我正在使用的代码 -

from celery import Celery
broker_url = 'amqp://guest:guest@localhost:5672//'
app = Celery('test', broker=broker_url, backend='amqp')

from celery.contrib.testing.worker import start_worker
from swordfish_app import tasks


# Testing the consumer logic
class ServiceAccountCeleryTestCase(TransactionTestCase):

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        cls.celery_worker = start_worker(app)
        cls.celery_worker.__enter__()

    @classmethod
    def tearDownClass(cls):
        super().tearDownClass() …
Run Code Online (Sandbox Code Playgroud)

python django rabbitmq celery rabbitmq-exchange

5
推荐指数
1
解决办法
1940
查看次数