标签: rabbitmq

spring amqp rabbitmq MessageListener无效

我正在尝试使用spring amqp使用rabbitmq,下面是我的配置.

<rabbit:connection-factory id="rabbitConnectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" />

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${rabbitmq.import.queue}" />

<rabbit:template id="importAmqpTemplate"
    connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" />

<beans:bean id="importExchangeMessageListener"
    class="com.stockopedia.batch.foundation.ImportMessageListener" />

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="5">
    <rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" />
</rabbit:listener-container>
Run Code Online (Sandbox Code Playgroud)

这是一个简单的Message Listener类,

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class ImportMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        System.out.println("consumer output: " + message);
    }

}
Run Code Online (Sandbox Code Playgroud)

这是生产者(春季批次的itemWriter),

public class ImportItemWriter<T> implements ItemWriter<T> {

    private AmqpTemplate template;

    public AmqpTemplate getTemplate() {
        return template;
    }

    public void setTemplate(AmqpTemplate template) {
        this.template = template; …
Run Code Online (Sandbox Code Playgroud)

java spring rabbitmq spring-batch spring-amqp

1
推荐指数
1
解决办法
8155
查看次数

"RABBITMQ_NODE_PORT"未被识别为内部或外部命令

我试图在一台机器(笔记本电脑)上为RabbitMQ配置一个集群环境.我正在关注RabbitMQ网站上的这个指南.我的机器使用的是Windows 7RabbitMQ服务器3.3.0.当我尝试运行以下命令时:

RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit rabbitmq-server -detached
Run Code Online (Sandbox Code Playgroud)

命令提示符发生以下错误:

"RABBITMQ_NODE_PORT" is not recognized as an internal or external command, operable program or batch file
Run Code Online (Sandbox Code Playgroud)

这个命令的作用是基本上为服务器设置变量RABBITMQ_NODE_PORTRABBITMQ_NODENAME作为名为rabbit且在端口5672上的单个节点运行.同样,我会尝试启动名为hare和端口5673的另一个节点.我知道这是一个系统路径问题,但我仍然无法解决它.有人可以帮帮我!!

cluster-computing command-prompt rabbitmq nodes windows-7

1
推荐指数
1
解决办法
1582
查看次数

Rabbitmq确认监听器不起作用

我正在尝试使用ConfirmListener但执行永远不会到达其方法.我是这样做的:

    channel.addConfirmListener(new ConfirmListener() {

        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            System.out.println("Not ack received");
        }

        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            System.out.println("Ack received");
        }
    });

    channel.exchangeDeclare(directExchangeName, directExchangeType, DURABLE, AUTO_DELETE, arguments);

    channel.queueBind(directQueueName, directExchangeName, routingKey);

    // AUTO_ACK = false
    channel.basicConsume(directQueueName, AUTO_ACK, routingKey, directConsumer);
Run Code Online (Sandbox Code Playgroud)

然后我用这个发表:

        channel.basicPublish(directExchangeName, routingKey, MANDATORY, properties, message.getBytes());
Run Code Online (Sandbox Code Playgroud)

后来我用这个消费了:

            @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

            // Send ack
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
Run Code Online (Sandbox Code Playgroud)

我做错了什么?

谢谢,欢呼.

java rabbitmq

1
推荐指数
1
解决办法
1562
查看次数

Python RabbitMQ - 消费者只看到每一条消息

我正在使用Pika 0.98测试RabbitMQ的生产者消费者示例.我的生产者在我的本地PC上运行,消费者在亚马逊的EC2实例上运行.

我的生产者坐在循环中,每秒发送一些系统属性.问题是我只看到消费者阅读每一条第二条消息,就好像每条第二条消息都没有被阅读.例如,我的生产者打印出这个(时间戳,使用的cpu pct,使用的RAM):


    2014-08-16 14:36:17.576000 -0700,16.0,8050806784
    2014-08-16 14:36:18.578000 -0700,15.5,8064458752
    2014-08-16 14:36:19.579000 -0700,15.0,8075313152
    2014-08-16 14:36:20.580000 -0700,12.1,8074121216
    2014-08-16 14:36:21.581000 -0700,16.0,8077778944
    2014-08-16 14:36:22.582000 -0700,14.2,8075038720

但我的消费者正在打印出这个:


    Received '2014-08-16 14:36:17.576000 -0700,16.0,8050806784'
    Received '2014-08-16 14:36:19.579000 -0700,15.0,8075313152'
    Received '2014-08-16 14:36:21.581000 -0700,16.0,8077778944'

生产者的代码是:



    import pika
    import psutil
    import time
    import datetime
    from dateutil.tz import tzlocal
    import logging
    logging.getLogger('pika').setLevel(logging.DEBUG)

    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='54.191.161.213'))
    channel = connection.channel()

    channel.queue_declare(queue='ems.data')

    while True:
        now = datetime.datetime.now(tzlocal())
        timestamp = now.strftime('%Y-%m-%d %H:%M:%S.%f %z')
        msg="%s,%.1f,%d" % (timestamp, psutil.cpu_percent(),psutil.virtual_memory().used)
        channel.basic_publish(exchange='', 
                          routing_key='ems.data',
                          body=msg)
        print msg
        time.sleep(1)
    connection.close() …
Run Code Online (Sandbox Code Playgroud)

python amqp rabbitmq pika

1
推荐指数
1
解决办法
886
查看次数

消耗消息时内存泄漏 - RabbitMQ C库

我有一个简单的C程序消耗来自RabbitMQ的消息并销毁它.它消耗了500万条消息.

#include <stdio.h>
#include <stdlib.h>
#include <amqp.h>

int main() {

    /* connect to broker */
    amqp_connection_state_t rabbit_conn = amqp_new_connection();
    if (rabbit_conn == NULL) {
        printf("cannot create AMQP connection\n");
        return EXIT_FAILURE;
    }
    amqp_socket_t *rabbit_socket = amqp_tcp_socket_new(rabbit_conn);
    if (rabbit_socket == NULL) {
        printf("cannot create AMQP socket\n");
        return EXIT_FAILURE;
    }

    int rc = amqp_socket_open(rabbit_socket, "localhost", 5672);
    if (rc != AMQP_STATUS_OK) {
        printf("cannot open socket to rabbitmq. reason: %s\n", amqp_error_string2(rc));
        return EXIT_FAILURE;
    }

    amqp_rpc_reply_t r = amqp_login(rabbit_conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"); …
Run Code Online (Sandbox Code Playgroud)

c valgrind memory-leaks rabbitmq

1
推荐指数
1
解决办法
894
查看次数

在C#中,如何处理当前队列中的所有RabbitMQ消息?

基本的RabbitMQ教程给出了如何从队列中连续检索消息的示例:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare("hello", false, false, false, null);

        var consumer = new QueueingBasicConsumer(channel);
        channel.BasicConsume("hello", true, consumer);

        Console.WriteLine(" [*] Waiting for messages." +
                                 "To exit press CTRL+C");
        while (true)
        {
            var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine(" [x] Received {0}", message);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我想要做的是检索已放入队列然后停止的所有消息.

以下两个例子可以解决我的问题

  1. 如果我在下午1点启动我的代码,我想处理下午1点之前放在队列中的所有消息.

要么

  1. 如果我在13:00:00启动我的代码,并且我的代码运行需要10秒,我不介意它是否包括在13:00:00和13:00:10之间放置在队列中的消息,如一旦队列为空,它就会停止.

我意识到我可以在我的消息中加上时间戳并检查它,或者我可以设置超时值,但我想知道是否有任何内置的方法来正确地执行此操作.

提前致谢.

.net c# rabbitmq

1
推荐指数
1
解决办法
6369
查看次数

在运行Rabbitmq的Docker容器上远程创建虚拟主机

我有一个Vagrantfile,它做2件重要的事情;首先拉并运行dockerfile / rabbitmq,然后从运行应用程序的自定义Dockerfile构建,该应用程序在Rabbitmq服务器上假设一个虚拟主机,比方说“ / foo”。

问题是虚拟主机不存在。

包含rabbitmq的容器正在成功运行,并且在运行生成的映像时,使用--link将应用程序链接到该应用。使用环境变量泊坞窗集,我可以访问服务器。但是在这些操作的中间,我需要创建虚拟主机,因为我的连接被拒绝,我认为是因为“ / foo”不存在。

如何将虚拟主机安装到Rabbit服务器上?

谢谢

注意-不能使用webadmin,这必须以编程方式完成。

rabbitmq docker vagrantfile

1
推荐指数
1
解决办法
2653
查看次数

Django + RabbitMQ + Celery都在不同的机器上(服务器)

我设法DjangoRabbitMQCelery单机上工作.我已按照此处的说明操作.现在我想让它们一起工作,但是当它们在不同的服务器上时.我不想Django知道任何事情Celery,也不Celery了解Django.

所以,基本上我只是想Django将一些消息发送到RabbitMQ队列(可能是id,任务类型,也许是其他一些信息),然后我想RabbitMQ将该消息(当可能时)发布到Celery另一台服务器上.Celery/Django不应该彼此了解,基本上我想要的架构很容易替换其中任何一个.

现在,我有Django几次打电话

create_project.apply_async(args, countdown=10)

我想用类似的调用直接替换为RabbitMQ(正如我所说的Django不应该依赖Celery).然后,RabbitMQ应该通知Celery(当可能时)Celery并将完成其工作(可能Django通过REST接口进行交互).

此外,我需要Celery在两个或更多服务器上安装工作人员,并且我只想RabbitMQ根据消息中的某个字段通知其中一个服务器.如果这很复杂,我可以检查每个任务(在不同的机器上),例如:这是你应该做的事情(比如检查消息中的ip地址字段),如果它不仅仅是停止执行任务.

我怎样才能做到这一点?如果可能的话,我更喜欢代码+配置示例而不仅仅是理论解释.

编辑:

我认为对于我的用例芹菜是总开销.使用自定义客户端的简单RabbitMQ路由将完成这项工作.我已经尝试过简单的用例(一个服务器),它运行得很好.应该很容易使通信多服务器准备就绪.我不喜欢芹菜.这是"神奇的",隐藏了太多的细节,并不容易配置.但我会留下这个问题,因为我对别人的意见很感兴趣.

architecture django rabbitmq celery multiserver

1
推荐指数
1
解决办法
1528
查看次数

使用Java停止队列监听RabbitMQ

嗨,我在我的Java应用程序中使用RabbitMQ。当我停止应用程序时,我需要确保在停止应用程序之前停止队列侦听器(停止从队列接收消息)。我不确定是否需要调用channel.close()channel.basicCancel(“ tag”)。编写了以下代码来停止队列侦听。

if(myContext.myChannel.isOpen()){ 

  //myChannel is the one I am using to listen to queue 
  myContext.myChannel.basicCancel("OP"); 

  //myContext.myChannel.close(); 
}else{ 
   return ok("Channel is not open"); 
}
Run Code Online (Sandbox Code Playgroud)

java message-queue rabbitmq

1
推荐指数
1
解决办法
3668
查看次数

在rabbitmq中同时连接到频道是否有任何限制?

我正在使用rabbitmq服务器,我有很多用户和很多通道连接发生并行.,在rabbitmq中同时连接频道是否有任何限制?

amqp rabbitmq

1
推荐指数
1
解决办法
206
查看次数