标签: rabbitmq

rabbitmq 使用带有 pika 的线程

我正在尝试使用rabbitmq 获得一个基本的队列系统,但是当我尝试使用线程时,它似乎只运行了 1 个线程。

我的代码:

import pika
import threading

rabbit_url = "amqp://user:pass!@127.0.0.1:5672/%2f"

def start(max_threads):
    for i in xrange(max_threads):
        t = threading.Thread(target=run)
        t.start()
        t.join()

def run():
    connection = pika.BlockingConnection(pika.URLParameters(rabbit_url))
    channel = connection.channel()
    channel.basic_consume(callback,
                          queue='docketq',
                          no_ack=True)

    channel.start_consuming()

def callback(ch, method, properties, body):
    do_work(body)

def do_work(body):
    print body
Run Code Online (Sandbox Code Playgroud)

python multithreading rabbitmq pika

2
推荐指数
2
解决办法
4640
查看次数

Spring-amqp RabbitMQ - 在 vhost '/' 错误中没有交换 'myexchange'

我知道之前有人问过这个问题,但我找不到任何合适的解决方案来解决我的问题,因此再次发布。

我正在尝试使用 spring-amqp 编写一个 rabbitmq 生产者,以在直接队列上发布我的消息。由于它是生产者应用程序,因此必须创建我的交换和队列。此外,我没有定义任何侦听器 bean,因为我在生产者端不需要它。下面是我的rabbitmq-config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd
http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd">

<!-- Spring AMQP Admin -->
<rabbit:admin id="amqpAdmin"
    connection-factory="rabbitmqConnectionFactory"
    auto-startup="true" />

<!-- Spring AMQP Template -->
<rabbit:template id="amqpTemplate"
    connection-factory="rabbitmqConnectionFactory"
    routing-key="my.key" exchange="my.exchange" 
    channel-transacted="true"/>

<!-- Connection Factory -->
<rabbit:connection-factory id="rabbitmqConnectionFactory"
    host="${rabbitmq.host:localhost}" 
    port="${rabbitmq.port:5672}" 
    username="${rabbitmq.username}"
    password="${rabbitmq.password}" 
    virtual-host="${rabbitmq.vhost:/}"
    cache-mode="CONNECTION" 
    channel-cache-size="${rabbitmq.channel-cache-size:25}" />

<!-- Queue and Exchange -->
<rabbit:queue id="my.queue" durable="false"
    auto-declare="true" auto-delete="true" />

<rabbit:direct-exchange name="my.exchange" durable="false"
    auto-declare="true" auto-delete="true" declared-by="amqpAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="my.queue" key="my.key"/>
    </rabbit:bindings>
</rabbit:direct-exchange>
</beans>
Run Code Online (Sandbox Code Playgroud)

我只有一个简单的生产者类,它使用上面的 …

java rabbitmq spring-rabbit spring-amqp

2
推荐指数
1
解决办法
2万
查看次数

RabbitMQ - 话题交换和竞争消费者

我成功设置了一个 Topic Exchange,并且能够一次向多个消费者发送消息。

我还想向竞争的消费者传递消息并继续使用主题交换。我读到使用相同的队列名称可以让消费者竞争消息。但是,我可能会弄错,因为我无法让它发挥作用。

为同一主题的多个侦听器设置:

  • 声明主题交换
  • 对于每个侦听器,声明一个具有自动生成名称的新队列
  • 使用给定的主题路由键将此队列绑定到上述交换

如何将竞争消费者设置为同一主题?

主题交换甚至可能吗?

谢谢。

.net c# rabbitmq

2
推荐指数
1
解决办法
3105
查看次数

RabbitMQ 和 php-amqplib:消费者重新连接

我在 PHP 脚本中有一个消费者工作者。

但是有时候RabbitMQ服务器会停止运行,

我收到此错误:

PHP Fatal error:  Uncaught exception 'ErrorException' with message 'fwrite(): send of 19 bytes failed with errno=32 Broken pipe' in /home/user/pusher/rabbitmq-worker/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:281
Run Code Online (Sandbox Code Playgroud)

我希望能够使用 try/catch 块来处理这个错误,以向我的控制台抛出一个很好的错误,或者在一段时间后尝试重新连接。

这是我到目前为止的代码:

    while (true) {
try {
    $connection = new AMQPStreamConnection(RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USER, RABBITMQ_PASS);
    $channel = $connection->channel();
    $channel->queue_declare(RABBITMQ_DT_QUEUE, false, true, false, false);
    $channel->basic_qos(null, 11, null);
    echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

    $callback = function($req) {
     sleep(3);//Some task that takes 3 sec
     };
    $channel->basic_consume(RABBITMQ_QUEUE, '', false, false, false, false, …
Run Code Online (Sandbox Code Playgroud)

php rabbitmq

2
推荐指数
1
解决办法
2703
查看次数

在 Spring AMQP 中使用消息侦听器适配器获取消息对象

我使用 spring AMQP 创建了一个消息侦听器,我用它来接收订单 POJO。消息类型是 application/json,所以我设置了一个 jackson 消息转换器。到目前为止,一切正常,我能够在我的侦听器中自动重新创建订单 POJO 对象。但是我想扩展这个例子并想检查我的监听器中的一些消息属性。因此,我想使用“org.springframework.amqp.core.Message”作为参数,而不是在我的 handleMessage() 中使用我的 Order POJO。然后我可以稍后转换正文,但通过这种方式,我将在我的侦听器中拥有所有与消息相关的属性,我可以在我的应用程序中使用这些属性。

我尝试将 handleMessage() 与 Message 参数一起使用,但似乎它也尝试使用 jackson 转换器来转换消息正文。我不确定在哪里传递 Order POJO 类 jackson 可以用来转换我的消息正文,但仍然应该能够正确转换消息。

请在下面找到我的代码中的重要片段。请帮助我,因为我认为我在这方面遇到了障碍。

POJO

public class Order {

private int orderid;
private String itemDescription;
Run Code Online (Sandbox Code Playgroud)

模板和转换器设置

@Bean
public RabbitTemplate rubeExchangeTemplate() {
    logger.info("Lets test autowiring " + rabbitConnectionFactory.getHost());
    RabbitTemplate r = new RabbitTemplate(this.rabbitConnectionFactory);
    r.setExchange("rmq-exchange");
    r.setMessageConverter(jsonMessageConverter());
    return r;
}

@Bean
public MessageConverter jsonMessageConverter()
{
    final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    converter.setClassMapper(classMapper());
    return converter;
}

@Bean
public DefaultClassMapper classMapper()
{ …
Run Code Online (Sandbox Code Playgroud)

java spring amqp rabbitmq spring-amqp

2
推荐指数
1
解决办法
8413
查看次数

更改 RabbitMQ 队列中的参数

我有一个 RabbitMQ 队列,它最初是这样声明的:

var result = _channel.QueueDeclare("NewQueue", true, false, false, null);
Run Code Online (Sandbox Code Playgroud)

我正在尝试添加死信交换,所以我将代码更改为:

_channel.ExchangeDeclare("dl.exchange", "direct");
Dictionary<string, object> args = new Dictionary<string, object>()
{
    { "x-dead-letter-exchange", "dl.exchange" }
};            

var result = _channel.QueueDeclare("NewQueue", true, false, false, args);
Run Code Online (Sandbox Code Playgroud)

当我运行这个时,我收到错误:

抛出异常:RabbitMQ.Client.dll 中的“RabbitMQ.Client.Exceptions.OperationInterruptedException”

附加信息:AMQP 操作被中断:AMQP close-reason,由 Peer 发起,code=406,text="PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'NewQueue' in vhost'/': received 'longstr' 类型的值 'dl.exchange' 但当前为 none", classId=50, methodId=10, cause=

该错误似乎不言自明,如果我删除队列,当我重新创建它时,我不会收到错误消息,但我的问题是:有没有办法在不删除队列的情况下进行此更改?

c# message-queue rabbitmq

2
推荐指数
1
解决办法
6435
查看次数

java - 如何使用Rabbit MQ将相关ID从发送方发送到消息中,并从接收方检索到消息头中

我使用 Rabbit MQ 发送和接收 JSON 消息。
我已经使用 RabbitTemplate 实例和 convertAndSend 方法实现了一个应用程序来向交换发送消息,如下所示: 在此处输入图片说明

rabbitTemplate.convertAndSend(exchangeNameOut, message.getString(PERSISTENCE_MESSAGE_ROUTING_KEY),
                    message.getString(PERSISTENCE_MESSAGE_BODY), new CorrelationData(""+analyticRuntime.getId()));
Run Code Online (Sandbox Code Playgroud)

在检索消息时,侧接收器,我实现了以下方法:

        @RabbitListener(queues = "${rabbit.queue.mail.name}",containerFactory = "rabbitListenerContainerFactory")
     public void processMailMessage(Message message) {
        log.info("ENTER [processMailMessage]  ");

        Mail mail;

            JSONObject messageBody = new JSONObject(new String(message.getBody()));

            String asset = "" + messageBody.get(Constants.PERSISTENCE_MESSAGE_ASSET_ID_KEY);

            String body = "" + messageBody.get(Constants.PERSISTENCE_MESSAGE_EVENT_KEY);

            String alarms = "";
            log.info("[processMailMessage] message.getMessageProperties().getCorrelationId() : " + message.getMessageProperties().getCorrelationId()";
}
Run Code Online (Sandbox Code Playgroud)

问题是:

  • 为什么 message.getMessageProperties().getCorrelationId() 是 NULL ?在发送方法中,我插入了相关性。
  • Correlation Id 与我插入到 convertAndSend 方法中的 Correlation Data 相同吗?
  • 如何将相关 ID 检索到接收器方法中?

谢谢

java rabbitmq spring-rabbit

2
推荐指数
1
解决办法
3024
查看次数

Windows 10 64 位上的 RabbitMQ 3.6.6 - 未检测到 Erlang

我正在运行 64 位 Windows 10,我安装了 Erlang 64 位 R16B03,设置了 ERLANG_HOME 系统环境变量(不是用户环境变量)并验证它可以从命令外壳正常工作。当我运行 RabbitMQ3.6.6.exe 进行设置时,出现错误“无法检测到 Erlang”。RabbitMQ 3.6.6 需要什么版本的 Erlang ??Erlang 必须安装在 C:\Program Files 中吗??我将它安装在 C:\Erlang 文件夹中。

我还尝试将 %ERLANG_HOME%\bin 添加到 Path ,但没有任何区别。我已经安装了 VC++ 可再发行 exe(2005、2008、2013、2015)。我尝试了 Erlang 的 OTP 19.2 (erl8.2) 版本,结果相同。有任何想法吗?

我试图避免从 RabbitMQ zip 文件手动安装,因为我是 RabbitMQ 的新手,只是想启动并运行它以便我可以学习它。

谢谢

windows erlang rabbitmq

2
推荐指数
1
解决办法
4036
查看次数

如何使用rabbitmq docker从spring-boot在rabbitmq中创建队列

我花了一整天的时间来尝试启动 spring-amqp 示例项目并针对 docker 版本的 rabbitmq 运行。我只是在运行标准的rabbitmq docker。虽然连接没有问题,但我总是遇到与创建队列相关的异常,此时我已经尝试了所有可能的变体。

我试图像示例项目一样在我的配置中声明队列。我试过显式配置 RabbitAdmin。我试过显式配置整个自动配置的混乱。我在 rabbitmq 中创建了一个新用户并明确分配了权限。我试过只使用队列名称而不实际声明队列 bean。我尝试将队列和 RabbitAdmin 注入其他 bean,只是为了强制创建它们。

无论我尝试什么,我都会收到以下错误:

2017-04-24 17:42:19.709  WARN 37360 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:"incoming"
2017-04-24 17:42:19.715  WARN 37360 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):["incoming"]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:621) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:520) ~[spring-rabbit-1.7.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1382) [spring-rabbit-1.7.1.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:992) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:50) ~[amqp-client-4.0.2.jar:4.0.2]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native …
Run Code Online (Sandbox Code Playgroud)

rabbitmq spring-rabbit spring-amqp docker spring-boot

2
推荐指数
1
解决办法
6958
查看次数

RabbitMQ - 访问通过 AMQP 上的 STOMP 创建的 /queue 或 /topic

我有一个场景,客户端通过 STOMP 将消息发布到 RabbitMQ/queue/topic目标。

另一方面,我有一个通过 AMQP 连接到 RabbitMQ 的服务器进程。是否可以从 AMQP 访问“STOMP 通道”,或者它们是否被隔离到自己的命名空间中?

看起来 AMQP 通道可以通过 STOMP 访问,/amq/queue/但我没有找到另一个方向的案例。

stomp amqp rabbitmq

2
推荐指数
1
解决办法
1324
查看次数