我正在尝试使用spring amqp使用rabbitmq,下面是我的配置.
<rabbit:connection-factory id="rabbitConnectionFactory"
port="${rabbitmq.port}" host="${rabbitmq.host}" />
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<rabbit:queue name="${rabbitmq.import.queue}" />
<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" />
<beans:bean id="importExchangeMessageListener"
class="com.stockopedia.batch.foundation.ImportMessageListener" />
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="5">
<rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" />
</rabbit:listener-container>
Run Code Online (Sandbox Code Playgroud)
这是一个简单的Message Listener类,
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class ImportMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("consumer output: " + message);
}
}
Run Code Online (Sandbox Code Playgroud)
这是生产者(春季批次的itemWriter),
public class ImportItemWriter<T> implements ItemWriter<T> {
private AmqpTemplate template;
public AmqpTemplate getTemplate() {
return template;
}
public void setTemplate(AmqpTemplate template) {
this.template = template; …Run Code Online (Sandbox Code Playgroud) 我试图在一台机器(笔记本电脑)上为RabbitMQ配置一个集群环境.我正在关注RabbitMQ网站上的这个指南.我的机器使用的是Windows 7和RabbitMQ服务器3.3.0.当我尝试运行以下命令时:
RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit rabbitmq-server -detached
Run Code Online (Sandbox Code Playgroud)
命令提示符发生以下错误:
"RABBITMQ_NODE_PORT" is not recognized as an internal or external command, operable program or batch file
Run Code Online (Sandbox Code Playgroud)
这个命令的作用是基本上为服务器设置变量RABBITMQ_NODE_PORT和RABBITMQ_NODENAME作为名为rabbit且在端口5672上的单个节点运行.同样,我会尝试启动名为hare和端口5673的另一个节点.我知道这是一个系统路径问题,但我仍然无法解决它.有人可以帮帮我!!
我正在尝试使用ConfirmListener但执行永远不会到达其方法.我是这样做的:
channel.addConfirmListener(new ConfirmListener() {
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Not ack received");
}
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Ack received");
}
});
channel.exchangeDeclare(directExchangeName, directExchangeType, DURABLE, AUTO_DELETE, arguments);
channel.queueBind(directQueueName, directExchangeName, routingKey);
// AUTO_ACK = false
channel.basicConsume(directQueueName, AUTO_ACK, routingKey, directConsumer);
Run Code Online (Sandbox Code Playgroud)
然后我用这个发表:
channel.basicPublish(directExchangeName, routingKey, MANDATORY, properties, message.getBytes());
Run Code Online (Sandbox Code Playgroud)
后来我用这个消费了:
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// Send ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
Run Code Online (Sandbox Code Playgroud)
我做错了什么?
谢谢,欢呼.
我正在使用Pika 0.98测试RabbitMQ的生产者消费者示例.我的生产者在我的本地PC上运行,消费者在亚马逊的EC2实例上运行.
我的生产者坐在循环中,每秒发送一些系统属性.问题是我只看到消费者阅读每一条第二条消息,就好像每条第二条消息都没有被阅读.例如,我的生产者打印出这个(时间戳,使用的cpu pct,使用的RAM):
2014-08-16 14:36:17.576000 -0700,16.0,8050806784
2014-08-16 14:36:18.578000 -0700,15.5,8064458752
2014-08-16 14:36:19.579000 -0700,15.0,8075313152
2014-08-16 14:36:20.580000 -0700,12.1,8074121216
2014-08-16 14:36:21.581000 -0700,16.0,8077778944
2014-08-16 14:36:22.582000 -0700,14.2,8075038720
但我的消费者正在打印出这个:
Received '2014-08-16 14:36:17.576000 -0700,16.0,8050806784'
Received '2014-08-16 14:36:19.579000 -0700,15.0,8075313152'
Received '2014-08-16 14:36:21.581000 -0700,16.0,8077778944'
生产者的代码是:
import pika
import psutil
import time
import datetime
from dateutil.tz import tzlocal
import logging
logging.getLogger('pika').setLevel(logging.DEBUG)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='54.191.161.213'))
channel = connection.channel()
channel.queue_declare(queue='ems.data')
while True:
now = datetime.datetime.now(tzlocal())
timestamp = now.strftime('%Y-%m-%d %H:%M:%S.%f %z')
msg="%s,%.1f,%d" % (timestamp, psutil.cpu_percent(),psutil.virtual_memory().used)
channel.basic_publish(exchange='',
routing_key='ems.data',
body=msg)
print msg
time.sleep(1)
connection.close() …Run Code Online (Sandbox Code Playgroud) 我有一个简单的C程序消耗来自RabbitMQ的消息并销毁它.它消耗了500万条消息.
#include <stdio.h>
#include <stdlib.h>
#include <amqp.h>
int main() {
/* connect to broker */
amqp_connection_state_t rabbit_conn = amqp_new_connection();
if (rabbit_conn == NULL) {
printf("cannot create AMQP connection\n");
return EXIT_FAILURE;
}
amqp_socket_t *rabbit_socket = amqp_tcp_socket_new(rabbit_conn);
if (rabbit_socket == NULL) {
printf("cannot create AMQP socket\n");
return EXIT_FAILURE;
}
int rc = amqp_socket_open(rabbit_socket, "localhost", 5672);
if (rc != AMQP_STATUS_OK) {
printf("cannot open socket to rabbitmq. reason: %s\n", amqp_error_string2(rc));
return EXIT_FAILURE;
}
amqp_rpc_reply_t r = amqp_login(rabbit_conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"); …Run Code Online (Sandbox Code Playgroud) 基本的RabbitMQ教程给出了如何从队列中连续检索消息的示例:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("hello", false, false, false, null);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("hello", true, consumer);
Console.WriteLine(" [*] Waiting for messages." +
"To exit press CTRL+C");
while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
}
}
}
Run Code Online (Sandbox Code Playgroud)
我想要做的是检索已放入队列然后停止的所有消息.
以下两个例子可以解决我的问题
要么
我意识到我可以在我的消息中加上时间戳并检查它,或者我可以设置超时值,但我想知道是否有任何内置的方法来正确地执行此操作.
提前致谢.
我有一个Vagrantfile,它做2件重要的事情;首先拉并运行dockerfile / rabbitmq,然后从运行应用程序的自定义Dockerfile构建,该应用程序在Rabbitmq服务器上假设一个虚拟主机,比方说“ / foo”。
问题是虚拟主机不存在。
包含rabbitmq的容器正在成功运行,并且在运行生成的映像时,使用--link将应用程序链接到该应用。使用环境变量泊坞窗集,我可以访问服务器。但是在这些操作的中间,我需要创建虚拟主机,因为我的连接被拒绝,我认为是因为“ / foo”不存在。
如何将虚拟主机安装到Rabbit服务器上?
谢谢
注意-不能使用webadmin,这必须以编程方式完成。
我设法Django和RabbitMQ和Celery单机上工作.我已按照此处的说明操作.现在我想让它们一起工作,但是当它们在不同的服务器上时.我不想Django知道任何事情Celery,也不Celery了解Django.
所以,基本上我只是想Django将一些消息发送到RabbitMQ队列(可能是id,任务类型,也许是其他一些信息),然后我想RabbitMQ将该消息(当可能时)发布到Celery另一台服务器上.Celery/Django不应该彼此了解,基本上我想要的架构很容易替换其中任何一个.
现在,我有Django几次打电话
create_project.apply_async(args, countdown=10)
我想用类似的调用直接替换为RabbitMQ(正如我所说的Django不应该依赖Celery).然后,RabbitMQ应该通知Celery(当可能时)Celery并将完成其工作(可能Django通过REST接口进行交互).
此外,我需要Celery在两个或更多服务器上安装工作人员,并且我只想RabbitMQ根据消息中的某个字段通知其中一个服务器.如果这很复杂,我可以检查每个任务(在不同的机器上),例如:这是你应该做的事情(比如检查消息中的ip地址字段),如果它不仅仅是停止执行任务.
我怎样才能做到这一点?如果可能的话,我更喜欢代码+配置示例而不仅仅是理论解释.
编辑:
我认为对于我的用例芹菜是总开销.使用自定义客户端的简单RabbitMQ路由将完成这项工作.我已经尝试过简单的用例(一个服务器),它运行得很好.应该很容易使通信多服务器准备就绪.我不喜欢芹菜.这是"神奇的",隐藏了太多的细节,并不容易配置.但我会留下这个问题,因为我对别人的意见很感兴趣.
嗨,我在我的Java应用程序中使用RabbitMQ。当我停止应用程序时,我需要确保在停止应用程序之前停止队列侦听器(停止从队列接收消息)。我不确定是否需要调用channel.close() 或channel.basicCancel(“ tag”)。编写了以下代码来停止队列侦听。
if(myContext.myChannel.isOpen()){
//myChannel is the one I am using to listen to queue
myContext.myChannel.basicCancel("OP");
//myContext.myChannel.close();
}else{
return ok("Channel is not open");
}
Run Code Online (Sandbox Code Playgroud) 我正在使用rabbitmq服务器,我有很多用户和很多通道连接发生并行.,在rabbitmq中同时连接频道是否有任何限制?
rabbitmq ×10
java ×3
amqp ×2
.net ×1
architecture ×1
c ×1
c# ×1
celery ×1
django ×1
docker ×1
memory-leaks ×1
multiserver ×1
nodes ×1
pika ×1
python ×1
spring ×1
spring-amqp ×1
spring-batch ×1
vagrantfile ×1
valgrind ×1
windows-7 ×1