我遇到了一个问题,设置basic.qos为1没有达到预期的效果 - 大量消息仍然被推送给我的消费者。
我的代码看起来有点像这样:
Channel channel = getChannel("pollQueuePassive"); // from our own channel pool implementation
try{
channel.queueDeclarePassive(queue.name);
} catch (IOException e){
channel = getChannel("pollQueueActive");
channel.queueDeclare(queue.name, true, false, false, null);
}
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue.name, autoAck, consumer);
while (!stopPolling()) {
try{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
boolean workResult = doWork(message);
if(!autoAck) {
if(workResult)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
else
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
} catch (InterruptedException e) {}
}
closeConnection();
Run Code Online (Sandbox Code Playgroud)
一旦我开始以这种方式从队列中消费,队列中的所有消息(在某些情况下最多 20,000 条)几乎会立即传递给消费者。由于我想同时将队列中的消息分发给数十个消费者,这种行为显然是不可取的。我玩过移动我的 …
这似乎是一个非常基本的操作,但我找不到删除 RabbitMQ 中命名交换的方法。我在 Windows 上运行它并使用命令行工具。此时,我会选择 .NET API 调用来删除交换(如果存在)。谢谢。
在 Docker 镜像中配置 RabbitMQ 是一场噩梦。当使用Fig作为链接容器运行 RabbitMQ 时,我可以连接到 RabbitMQ 接口,并成功配置vhosts和允许我的 Celery 工作人员进行连接,没有任何问题。
但是,重新启动 Docker 会丢失配置设置。我怎样才能保留这些设置?
我尝试过的一些事情:
/etc/rabbitmq/rabbit.configDocker 映像。RabbitMQ 会忽略它。ENV HOSTNAME localhost,但这似乎会干扰图 1 中链接 Docker 容器。我究竟做错了什么?是否有一个规范的 Dockerfile 用于获取配置的 RabbitMQ docker 容器以用于 Docker 链接开发目的,最好使用 Fig?
我从事 DevOps,我们的开发人员坚持认为,由于 Node.js 的缺陷,他们用 Node.js 编写的软件只能指向单个后端服务器,无论是数据库、Web 服务器还是 RabbitMQ 服务器。这对我来说听起来完全疯狂。这怎么可能?
local.js在文件中考虑这一点
rabbitmq:{
host: "rabbit01.stage",
port: 5672,
username : "user",
password : "pass",
exchangeName: "exchange_blah",
queueName: "queue_blah",
name : "rabbit",
max : 10
}
Run Code Online (Sandbox Code Playgroud)
将配置更改为
host: ["rabbit01.stage", "rabbit02.stage" ],
Run Code Online (Sandbox Code Playgroud)
破坏应用程序并尝试在上找到兔子服务器localhost:5672
我的 Google-fu 失败了,因为我不知道如何正确地将这个问题表述为可搜索的短语。
我在这里错过了什么吗?或者我们的开发者应该RTFM?
我在我们正在开发的一款移动应用程序中使用 Rabbit MQ 代理,我对安全方面有点困惑。我们正在使用云托管的rabbitmq,托管平台已经为我们提供了用户名和密码(此后已更改),并且我们正在使用SSL连接,因此不太担心MIM或窃听。
我担心的是任何知道主机和端口的人都可以连接到rabbitmq,因为我们有移动应用程序,我们将rabbitmq用户名和密码存储在设备上(尽管已加密),所以我猜想任何能够物理访问设备并以某种方式解密用户名密码的人可以登录rabbitmq,一旦登录,您几乎可以在rabbitmq上执行任何操作,例如删除队列等。像Rabbitmq这样的MQ如何在移动环境中使用。有没有更好/更安全的使用rabbitmq的方法。
我知道 HA 策略是通过以下命令设置的:
$ rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Run Code Online (Sandbox Code Playgroud)
我的问题看起来很基本:
我是否必须在每个节点上或仅在其中一个节点上发出此命令?
我正在使用 REST APIhttp://192.168.99.100:32787/api/exchanges/%2f/amq.direct/publish
将消息发布到我的helloworld.q队列。
有效负载:
{"properties":{},"routing_key":"","payload":"Hello World","payload_encoding":"string"}
Run Code Online (Sandbox Code Playgroud)
我还没有创建任何新的交换。我不确定要指定哪个交换,因此amq.direct在其余网址中使用。我已经提供了基本身份验证凭据,并且从 API 收到了以下响应。
{
"routed": false
}
Run Code Online (Sandbox Code Playgroud)
不知道出了什么问题。
有关如何使用其 HTTP API 的最新文档可以在此处找到。 https://rawcdn.githack.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_11/priv/www/api/index.html
我使用以下代码进行消息转换器:
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, Queue queue,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queue.getName());
container.setMessageListener(listenerAdapter);
container.setMessageConverter(new Jackson2JsonMessageConverter());
return container;
}
Run Code Online (Sandbox Code Playgroud)
我的听众被宣布:
public void receiveMessage(List<Map<String, Object>> message) {
try {
System.out.println("Received <" + new String(message, "UTF-8") + ">");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud)
但它总是尝试给出以下错误:
Failed to invoke target method 'receiveMessage' with argument type = [class [B], value = [{[B@40c2d9c5}]","at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408)
Run Code Online (Sandbox Code Playgroud)
它似乎尝试调用 byte[] 作为参数,而不是将 json 字符串转换为 List>。
在其中一种环境中启动服务器时出现以下异常。但是,它在另一个环境中运行良好。
Caused by: org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:342)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:363)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173)
... 54 more
Caused by: org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:790)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:348)
at com.sun.proxy.$Proxy89.queueDeclarePassive(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:216)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no …Run Code Online (Sandbox Code Playgroud) 我正在使用celery和rabbitmq,但是由于在队列中推送了多个任务,我的服务器内存利用率变得超过40%,因此rabbit将不再接受任何任务。所以我想删除那些已经执行的消息,但是由于rabbitmq的持久行为,这些消息不会自动删除,所以我想设置一些配置,例如 autoAck=True ,这样如果从 celery 消耗消息,它将从rabbitmq 队列以及我的服务器内存。请解释一下我们该如何做到这一点。
rabbitmq ×10
java ×2
spring ×2
amqp ×1
c# ×1
celery ×1
docker ×1
javascript ×1
messaging ×1
node.js ×1
rabbitmqctl ×1
rest ×1
security ×1
spring-amqp ×1
ssl ×1