我们有一个应用程序将使用RabbitMQ并有几个不同的队列在层之间传递消息.
最初,我计划使用多个直接交换,每个消息类型一个,但看起来使用不同的路由键绑定与队列进行单个主题交换将实现相同的目的.
进行单次交换似乎也更容易维护,但我想知道是否有任何好处(如果有的话)以另一种方式做到这一点?
选项1,使用多个直接交换:
ExchangeA (type: direct)
-QueueA
ExchangeB (type: direct)
-QueueB
ExchangeC (type: direct)
-QueueC
Run Code Online (Sandbox Code Playgroud)
选项2,使用单一主题交换:
Exchange (type: topic)
-QueueA (receives messages from exchange with routing key of "TypeA")
-QueueB (receives messages from exchange with routing key of "TypeB")
-QueueC (receives messages from exchange with routing key of "TypeC")
Run Code Online (Sandbox Code Playgroud) 我正在使用rabbitmq来处理我工作的企业应用程序的数据库之间的消息.作为流程的一部分,我正在尝试帮助自动化服务器的设置(例如脚本).在此过程中,我尝试使用rabbitmqadmin
从命令提示符声明交换.我有2个不同的服务器在运行CentOS 5.x
,CentOS 6.x
并且两个都有相同的问题.
到目前为止,我宣布了一个管理员用户并设置了密码,然后我将其标记设置为管理员标记,然后我确保它具有对vhost的权限.之后,我尝试声明交换,指定用户和密码,它失败.
rabbitmqctl add_user administrator password
rabbitmqctl set_user_tags administrator administrator
rabbitmqctl set_permissions -p / administrator ".*" ".*" ".*"
rabbitmqctl add_vhost vhostFoo
rabbitmqctl set_permissions -p vhostFoo administrator ".*" ".*" ".*"
rabbitmqadmin -u administrator -p password declare exchange --vhost=vhostFoo name=exchangeNew type=direct
Run Code Online (Sandbox Code Playgroud)
对于CentOS 5.x盒子,我必须使用python26 rabbitmqadmin
而不仅仅是rabbitmqadmin
,但得到相同的结果.
最后一个命令导致:
*** Access refused: /api/exchanges/vhostFoo/exchangeNew
Run Code Online (Sandbox Code Playgroud)
我的CentOS 5.x和Cent 6.x盒子都在运行rabbitmq 3.3.5.关于我缺少什么的想法或者我如何获得有关错误的更多信息的想法(例如错误日志文件或获得更详细输出的方式)?
我想知道为什么我们需要routing key
将消息路由exchange
到队列.我们不能使用简单队列名来路由消息.此外,在发布到多个队列的情况下,我们可以使用多个队列名称.任何人都可以指出我们实际需要路由密钥的场景,队列名称是不够的.
从春季启动教程:https: //spring.io/guides/gs/messaging-rabbitmq/
他们给出了仅创建1个队列和1个队列的示例,但是,如果我希望能够创建多于1个队列,该怎么办?怎么可能?
显然,我不能只创建两次相同的bean:
@Bean
Queue queue() {
return new Queue(queueNameAAA, false);
}
@Bean
Queue queue() {
return new Queue(queueNameBBB, false);
}
Run Code Online (Sandbox Code Playgroud)
你不能两次创建相同的bean,它会使模糊不清.
rabbitmq spring-amqp spring-boot rabbitmq-exchange spring-rabbitmq
我已经关注 MassTransit 几个星期了,我对各种可能性很好奇。但是,我似乎无法完全正确地理解这些概念。
预期行为 我想将消息发布到“直接”交换与路由键绑定到两个不同的队列以执行其他活动。
当我使用 MassTransit 尝试相同的逻辑以获得更好的可扩展性时。我发现 MassTransit 根据带有扇出类型的队列名称创建自己的交换。
通过交换和路由密钥发布消息的经典代码
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange, "direct");
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange, routingKey, null, body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
Run Code Online (Sandbox Code Playgroud)
有没有办法在 MassTransit 中使用路由密钥配置直接或主题交换?
我想实现类似选项3 RabbitMQ的拓扑这里,除了一些差别:
新的拓扑每天应该处理几千条消息。并且它应该有两个交换:一个用于处理主队列(大约30个),另一个用于处理重试和错误队列(大约60个)。我一直在关注本教程和常规的RMQ教程,以及许多SO帖子。RMQ服务器在Docker容器中启动。
我面临的问题是,并非所有消息都被使用者接收,并且接收消息的顺序是意外的。我还看到同一封邮件被拒绝两次。这是我的代码:
exchanges.py
def callback(self, channel, method, properties, body):
print("delivery_tag: {0}".format(method.delivery_tag))
data = json.loads(body)
routingKey = data.get('routing-key')
routingKey_dl_error = queues_dict[routingKey]['error']
print(" [X] Got {0}".format(body))
print(" [X] Received {0} (try: {1})".format(data.get('keyword'), int(properties.priority)+1))
# redirect faulty messages to *.error queues
if data.get('keyword') == 'FAIL':
channel.basic_publish(exchange='exchange.retry',
routing_key=routingKey_dl_error,
body=json.dumps(data),
properties=pika.BasicProperties(
delivery_mode=2,
priority=int(properties.priority),
timestamp=int(time.time()),
headers=properties.headers))
print(" [*] Sent to error queue: {0}".format(routingKey_dl_error))
time.sleep(5)
channel.basic_ack(delivery_tag=method.delivery_tag) #leaving this in creates 1000s of iterations(?!)
# check number of sent counts
else:
# …
Run Code Online (Sandbox Code Playgroud) 我已经在 kubernetes 集群上使用 helm chart 安装了 rabbitmq。rabbitmq pod 不断重启。在检查 pod 日志时,我收到以下错误
2020-02-26 04:42:31.582 [warning] <0.314.0> Error while waiting for Mnesia tables: {timeout_waiting_for_tables,[rabbit_durable_queue]}
2020-02-26 04:42:31.582 [info] <0.314.0> Waiting for Mnesia tables for 30000 ms, 6 retries left
Run Code Online (Sandbox Code Playgroud)
当我尝试执行 kubectl describe pod 时出现此错误
Conditions:
Type Status
Initialized True
Ready False
ContainersReady False
PodScheduled True
Volumes:
data:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: data-rabbitmq-0
ReadOnly: false
config-volume:
Type: ConfigMap (a volume populated by a ConfigMap)
Name: rabbitmq-config …
Run Code Online (Sandbox Code Playgroud) rabbitmq kubernetes rabbitmq-exchange google-kubernetes-engine kubernetes-helm
我可以使用Publish/Subscribe RabbitMQ Java教程创建一个扇出交换,任何连接的使用者都将收到一条消息的副本.我想在连接任何消费者之前创建交换和绑定,而不是动态地/以编程方式声明交换和绑定.我是通过RabbitMQ管理控制台完成的.但是,出于某种原因,我的消费者正在以循环方式接收消息,而不是全部接收消息的副本.我错过了什么?以下是一些代码段:
出版商:
channel.basicPublish("public", "", null, rowId.getBytes("UTF-8"));
Run Code Online (Sandbox Code Playgroud)
消费者:
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("myqueue", false, consumer);
Run Code Online (Sandbox Code Playgroud)
...在RabbitMQ管理控制台中,我创建了一个"扇出"类型的交换"公共",并且我将该交换的绑定设置为"myqueue".
我很感激任何帮助!
TLDR; 在消费者即时创建的主题交换和队列的上下文中,当没有消费者消费该消息时,如何重新传递消息/生产者被通知?
我有以下组件:
我目前只有一个RabbitMQ主题交换.
routing_key = file_category
.现在 - 这个工作正常,但它仍然有一个重大问题.目前,如果发布者发送的消息带有没有绑定使用者的路由密钥,则该消息将丢失.这是因为即使消费者创建的队列是持久的,一旦消费者断开连接,它就会被销毁,因为它对这个消费者是唯一的.
消费者代码(python):
channel.exchange_declare(exchange=exchange_name, type='topic', durable=True)
result = channel.queue_declare(exclusive = True, durable=True)
queue_name = result.method.queue
topics = [ "pictures.*", "videos.trending" ]
for topic in topics:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)
channel.basic_consume(my_handler, queue=queue_name)
channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)
在我的用例中,在这种情况下丢失消息是不可接受的.
但是,如果通知生产者没有消费者收到消息(在这种情况下它可以稍后重新发送),则"丢失"消息变得可以接受.我发现强制性字段可能有所帮助,因为AMQP的规范规定:
如果消息无法路由到队列,此标志告诉服务器如何做出反应.如果设置了此标志,则服务器将返回带有Return方法的unroutable消息.
这确实有效 - 在制片人中,我能够注册ReturnListener
:
rabbitMq.confirmSelect();
rabbitMq.addReturnListener( (int replyCode, String replyText, String exchange, String routingKey, …
Run Code Online (Sandbox Code Playgroud) python java distributed-computing rabbitmq rabbitmq-exchange
我正在尝试使用php-amqplib来发送和接收消息.它可以在终端上发送/接收.但是当进入Web浏览器时,无法从队列中接收它连续等待消息.我在下面的代码中使用了receive.php
require_once(__DIR__ . '/lib/php-amqplib/amqp.inc');
include_once(__DIR__ . '/config/config.php');
$connection = new AMQPConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->queue_declare('test22');
$callback = function($msg){
echo $msg->body;
};
$channel->basic_consume('test22', 'consumer_tag', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
Run Code Online (Sandbox Code Playgroud)
如果我使用下面而不是回调函数但它不从队列中消耗,它会从队列中获取第一条消息
$abc=$channel->basic_get("test22", false, 2);
if(!empty($abc))
{
print_r($abc->body);
}
Run Code Online (Sandbox Code Playgroud)
这意味着消息在队列'test22'中可用.给我任何线索.
rabbitmq ×9
rabbitmqctl ×2
amqp ×1
c# ×1
docker ×1
java ×1
kubernetes ×1
masstransit ×1
messaging ×1
php ×1
php-amqp ×1
pika ×1
python ×1
python-3.x ×1
spring-amqp ×1
spring-boot ×1