我无法弄清楚使用RabbitMQ中提供的不同交换类型的实际场景.
我正在使用NServiceBus进行一些例子,我偶然发现了一个我希望搭载MassTransit的功能(因为它是免费服务).
该功能基于"中毒"消息.
如果由于系统中的错误而导致这些消息无法处理,并永久停留在错误队列中.
NServiceBus有一个很酷的功能,一旦你纠正了代码中的错误,就可以将错误队列中的那些消息" 重定向 "到原始工作队列,然后重新传递.
这是通过使用NServiceBus特定工具完成的: - ReturnToSourceQueue.exe.
MassTransit是否有针对此类问题的类似工具?
或者是否有另一种可用的解决方法,最好是与RabbitMQ一起使用.
我的服务器正在运行NodeJS并使用amqplib api从另一个应用程序请求数据.NodeJS服务器正在成功接收信息但是有明显的延迟,我正在尝试确定我是否以最有效的方式执行此操作.具体来说,我关心的是打开和关闭连接的方式.
项目布局
我有两个处理接收和请求数据的控制器文件,request.img.server.controller.js和receive.img.server.controller.js.最后,当按下前端的按钮oct.server.routes.js时,路由处理控制器方法.
request.img.server.controller.js
'use strict';
var amqp = require('amqplib/callback_api');
var connReady = false;
var conn, ch;
amqp.connect('amqp://localhost:5672', function(err, connection) {
conn = connection;
connReady = true;
conn.createChannel(function(err, channel) {
ch = channel;
});
});
exports.sendRequest = function(message) {
console.log('sending request');
if(connReady) {
var ex = '';
var key = 'utils';
ch.publish(ex, key, new Buffer(message));
console.log(" [x] Sent %s: '%s'", key, message);
}
};
Run Code Online (Sandbox Code Playgroud)
receive.img.server.controller.js
var amqp = require('amqplib/callback_api');
var fs = require('fs');
var wstream = fs.createWriteStream('C:\\Users\\yako\\desktop\\binarytest.txt');
var image, …Run Code Online (Sandbox Code Playgroud) 我正在尝试将消息传递给交易所,然后传递给特定的队列。
$conn = new AMQPConnection(RABBITMQ_NODE_IP_ADDRESS, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD, RABBITMQ_VHOST);
$queue = RABBITMQ_QUEUE_CSV;
$exchange = RABBITMQ_EXCHANGE;
$ch = $conn->channel();
$ch->exchange_declare($exchange, 'direct', false, true, false);
$ch->queue_declare($queue, false, true, false, false);
$ch->queue_bind($queue, $exchange, $queue);
$msg = new AMQPMessage(json_encode($params), array('content_type' => 'text/plain', 'delivery_mode' => 2));
$ch->basic_publish($msg, $exchange);
$ch->close();
$conn->close();
$status = Array("status" => "Job queued");
Run Code Online (Sandbox Code Playgroud)
交换类型为“直接”,然后添加了一个名为“ foo_bar”的队列,并使用了相同的“ foo_bar”字符串作为“路由键”。
只是阅读文档http://www.rabbitmq.com/tutorials/tutorial-four-php.html我不明白我在哪里做错了:(
当我尝试通过Spring Boot amqp连接到这个服务器时,我明白了 com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
我的配置就是这个
# Message
spring.activemq.broker-url=tcp://127.0.0.1:5672
spring.activemq.user=test
spring.activemq.password=test
Run Code Online (Sandbox Code Playgroud)
是的,用户测试可以在/上访问虚拟主机,是的,我可以在RabbitMQ GUI上使用test/test登录
编辑
看着rabbitmq日志,我看到了这个
{handshake_error,starting,0,
{amqp_error,access_refused,
"PLAIN login refused: user 'guest' - invalid credentials",
'connection.start_ok'}}
Run Code Online (Sandbox Code Playgroud)
看起来Spring似乎无视我的配置并试图与之联系 guest
我一直在从事一个分布式Web项目,我想在其中使用RabbitMq来利用Spring amqp。我在项目中使用springFramework版本4.1.6。为此,我将以下依赖项添加到了文件pom.xml中。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.7</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.5.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.5.3.RELEASE</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
另外,我在下面的web.xml中放置了RabbitMQ配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd">
<context:component-scan base-package="org.myTest" />
<rabbit:connection-factory id="connectionFactory"
host="localhost" port="5672" username="guest" password="guest" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="myQueue" />
<rabbit:topic-exchange name="myExchange">
<rabbit:bindings>
<rabbit:binding queue="myQueue" pattern="myQueue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="3" />
<property name="maxConcurrentConsumers" …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用鼠标连接到我的远程rabbitmq,但我收到Connectionclosed()错误.我已经在guest.user用户的rabbit.config中进行了必要的更改,以允许所有连接以及同样的连接从我的Java代码开始工作.我甚至尝试使用所有权限创建一个新用户并连接它,但它仍然无效.虽然相同的代码在我的localhost上工作正常.任何人都可以让我知道我在这里做错了什么?
def queue_message(message, queue):
credentials = pika.PlainCredentials('xxxx', 'xxxx')
parameters = pika.ConnectionParameters('remote-server',
5672,
'/',
credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='python_update_queue')
channel.basic_publish(exchange='update.fanout',
body=message)
logger.info("Sent message: {} to queue: {}".format(message, queue))
print 'message sent'
connection.close()
Run Code Online (Sandbox Code Playgroud)
以下是我得到的错误:
app/project/rabbitmq.py" in queue_message
connection = pika.BlockingConnection(parameters)
env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py" in __init__
self._process_io_for_connection_setup()
env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py" in ss_io_for_connection_setup
self._open_error_result.is_ready)
env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py" in _flush_output
raise exceptions.ConnectionClosed
Run Code Online (Sandbox Code Playgroud) 假设我有m条消息在队列中,并且我有n条消息需要发布。如何以随机顺序将n条消息插入队列中,而不是附加到队列末尾?换句话说,如何使用rabbitMQ随机播放?
我有RabbitMQ实例来处理消息.我注意到,由于某些原因,Rabbit停止向消费者发送消息,因为队列不断增长,最终系统中的磁盘空间非常小.
我能够在兔子的日志文件中找到以下信息:
** Reason for termination ==
** {{badmatch,{error,enospc}},
[{rabbit_msg_store,terminate,2,
[{file,"src/rabbit_msg_store.erl"},{line,975}]},
{gen_server2,terminate,3,[{file,"src/gen_server2.erl"},{line,1146}]},
{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,250}]}]}
** In 'terminate' callback with reason ==
** {{badmatch,{error,enospc}},
[{rabbit_msg_store,write_message,3,
[{file,"src/rabbit_msg_store.erl"},{line,1160}]},
{rabbit_msg_store,handle_cast,2,
[{file,"src/rabbit_msg_store.erl"},{line,881}]},
{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1049}]},
{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,250}]}]}
Run Code Online (Sandbox Code Playgroud)
你们有没有人遇到这样的问题?兔子试图告诉我的任何帮助或信息将不胜感激.
谢谢.
rabbitmq ×10
amqp ×3
java ×2
spring-amqp ×2
express ×1
javascript ×1
masstransit ×1
node.js ×1
nservicebus5 ×1
php ×1
php-amqplib ×1
pika ×1
python ×1
spring ×1
spring-boot ×1