当我们将消息发送到RabbitMQ时,如果队列不存在,消息将丢失而不会引发任何错误。
邮件将发布到哪里?死队列?
我在Spring AMQP上使用RabbitMQ
使用该消息将花费> = 200 * 8秒,这超过了我的心跳间隔。从/sf/answers/2965457981/
如果节点之间的消息传输时间(60秒?)>节点之间的心跳时间,将导致群集断开连接并松动消息
更新:
发布自己的答案后,我还收到了另一个答案和评论。感谢您的反馈。只是为了澄清,我不使用AMQP进行文件传输。实际上,数据是在JSON消息中,一些简单而又小,但有些包含复杂的信息,包括一些手绘图。除了在Data Center中保存数据外,我们还通过AMQP在分支级别保存消息副本,以防无法连接到Data Center的情况。
我正在尝试从localhost:5000上托管的python服务器向RabbitMQ服务器发送消息(使用RabbitMQ的docker映像),但是出现以下错误:
socket.gaierror gaierror:[Errno -2]名称或服务未知
我正在使用命令'Rabbithost'是我正在使用的主机名运行RabbitMQ的docker镜像:
sudo docker run -d --hostname Rabbithost --name Rabbitmq -p 15672:15672 -p 5672:5672 -p 5671:5671 rabbitmq:3-management
这是给出错误的python代码:
def send_to_queue(message):
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('rabbithost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello',body=message)
connection.close()
return "Message Sent! "
Run Code Online (Sandbox Code Playgroud)
错误在第:
连接= pika.BlockingConnection(参数)
主要是因为有参数实参。我无法找到此错误的确切解决方案。
尝试启动我的Rabbitmq服务器,但出现此错误。我已按照所有说明进行安装。该rabbitmq-server
文件位于其中/usr/local/sbin
。所以我将目录更改为sbin
,并rabbitmq-server
在终端中写入内容,但显示错误-bash: rabbitmq-server: command not found
。我已经按照说明export PATH=/usr/local/sbin:$PATH
在我的底部添加了以下行:bash_profile
知道为什么它不起作用吗?
在运行时构造它的最简单方法是什么?
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "providedAtRuntime", durable = "true"),
exchange = @Exchange(value = "providedAtRuntime", ignoreDeclarationExceptions = "true"),
key = "providedAtRuntime"), containerFactory = "cFac")
public class RabbitProcessor {
@RabbitHandler
public void receive (String smth){
System.out.println(smth);
}
}
Run Code Online (Sandbox Code Playgroud)
我想定义监听器,但在运行时提供交换,队列名称和绑定.此侦听器也不应自动启动,而应由start()方法调用.同时它应该自动声明绑定和队列等.当调用stop()时,它应该停止消耗.
根据https://www.rabbitmq.com/which-erlang.html,我有正确版本的erlang .
但是当我尝试在我的CentOS7上安装rabbitmq-server-3.7.1时,它说我的erlang版本错了,我需要版本> = 19.3.
但我已经20岁了.
我究竟做错了什么?
test_user@test_machine: ~/Downloads
$ sudo yum install rabbitmq-server-3.7.1-1.el7.noarch.rpm
Loaded plugins: fastestmirror, langpacks, versionlock
Examining rabbitmq-server-3.7.1-1.el7.noarch.rpm: rabbitmq-server-3.7.1-1.el7.noarch
Marking rabbitmq-server-3.7.1-1.el7.noarch.rpm to be installed
Resolving Dependencies
--> Running transaction check
---> Package rabbitmq-server.noarch 0:3.7.1-1.el7 will be installed
--> Processing Dependency: erlang >= 19.3 for package: rabbitmq-server-3.7.1-1.el7.noarch
Loading mirror speeds from cached hostfile
* base: mirror.awanti.com
* epel: mirror.awanti.com
* extras: mirror.awanti.com
* updates: mirror.awanti.com
--> Processing Dependency: socat for package: rabbitmq-server-3.7.1-1.el7.noarch
--> Running transaction check …
Run Code Online (Sandbox Code Playgroud) 我想为连接到RabbitMQ的某些组件设置一些本地测试。为此,一种解决方案似乎是QPID内存代理,并且在遵循这些说明时(仅使用当前版本的7.0.3而不是7.0.0)实际上非常有效。
对于Rabbit MQ,我删除了"AMQP_1_0"
协议,仅添加了qpid- broker-plugins-amqp-0-8-protocol
dependency。我也用替换Authenticationprovider
了PLAIN
。这足以使我的RabbitMQ
组件正常工作并通过该组件发送/接收消息Broker
。
但是问题是这条线...
"port" : "${qpid.amqp_port}",
Run Code Online (Sandbox Code Playgroud)
据我了解,这应该允许我通过在调用时设置此属性来定义代理正在侦听的端口systemLauncher.startup
。
attributes.put("qpid.amqp_port", 12345);
Run Code Online (Sandbox Code Playgroud)
不幸的是,这不起作用,并且代理总是在默认端口(5672,iirc)上侦听。显然,这对于自动化测试不是最佳选择,因此我正在寻找以下一种可能性(或更好的一种):
SystemLauncher
似乎没有提供任何查询方式。我已经找到了在rabittMq中发送字符串并从队列接收的示例,但是我不清楚这些方法-assertQueue,sendToQueue
send.js
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'hello';
var msg = 'Hello World! - '+i;
ch.assertQueue(q, {durable: false});
ch.sendToQueue(q, new Buffer(msg));
console.log(" [x] Sent %s", msg);
});
setTimeout(function() { conn.close(); process.exit(0) }, 1000);
});
Run Code Online (Sandbox Code Playgroud)
receive.js
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) { //amqp://localhost
conn.createChannel(function(err, ch) {
var q = 'hello';
ch.assertQueue(q, {durable: false});
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
ch.consume(q, function(msg) …
Run Code Online (Sandbox Code Playgroud) 我有一些微服务,通过API网关公开。网关负责处理身份验证和路由到系统中。网关后面的服务主要是简单的CRUD服务。每个服务都公开自己的API,并通过HTTP同步通信。所有这些服务,包括API-Gateway,都是“默认” NestJS应用程序。
让我们继续坚持猫的例子。每当Cat-Service
更新或创建新的时Cat
,我都希望CatCreatedEvent
或被CatUpdatedEvent
模仿。该事件应被推送到某些消息代理中,例如RabbitMQ,而另一个服务应侦听此事件并异步处理该事件。
我不知道如何以正确的方式“注入” RabbitMQ来实现这一目标,我想知道这种方法是否合乎常理。我已经看到了NestJS的CQRS模块,但是我认为CQRS对于该域来说有点过多。尤其是因为在此域中没有好处,无法拆分读写模型。也许我完全走错了道路,所以希望您能给我一些建议。
我想共享BlockingChannel
多个python进程。为了basic_ack
从其他python进程发送
。
如何BlockingChannel
在多个python进程之间共享。
以下是代码:
self.__connection__ = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.__channel__ = self.__connection__.channel()
Run Code Online (Sandbox Code Playgroud)
我尝试转储使用,pickle
但它确实允许转储频道并can't pickle select.epoll objects
使用以下代码给出错误
filepath = "temp/" + "merger_channel.sav"
pickle.dump(self.__channel__, open(filepath, 'wb'))
Run Code Online (Sandbox Code Playgroud)
目标:
目标是basic_ack
从其他python进程的通道发送。