标签: rabbitmq

RabbitMQ 预取被忽略

我遇到了一个问题,设置basic.qos1没有达到预期的效果 - 大量消息仍然被推送给我的消费者。

我的代码看起来有点像这样:

    Channel channel = getChannel("pollQueuePassive"); // from our own channel pool implementation

    try{
        channel.queueDeclarePassive(queue.name);
    } catch (IOException e){
        channel = getChannel("pollQueueActive");
        channel.queueDeclare(queue.name, true, false, false, null);
    }

    channel.basicQos(1);

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queue.name, autoAck, consumer);

    while (!stopPolling()) { 
        try{
              QueueingConsumer.Delivery delivery = consumer.nextDelivery();
              String message = new String(delivery.getBody());

              boolean workResult = doWork(message);
              if(!autoAck) {
                  if(workResult) 
                      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
                  else
                      channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
              }

            } catch (InterruptedException e) {}
    }

    closeConnection();
Run Code Online (Sandbox Code Playgroud)

一旦我开始以这种方式从队列中消费,队列中的所有消息(在某​​些情况下最多 20,000 条)几乎会立即传递给消费者。由于我想同时将队列中的消息分发给数十个消费者,这种行为显然是不可取的。我玩过移动我的 …

java messaging amqp rabbitmq

1
推荐指数
1
解决办法
2484
查看次数

如何删除 RabbitMQ 交换?

这似乎是一个非常基本的操作,但我找不到删除 RabbitMQ 中命名交换的方法。我在 Windows 上运行它并使用命令行工具。此时,我会选择 .NET API 调用来删除交换(如果存在)。谢谢。

c# rabbitmq rabbitmq-exchange

1
推荐指数
1
解决办法
3715
查看次数

在 Docker 镜像中保留 RabbitMQ 配置

在 Docker 镜像中配置 RabbitMQ 是一场噩梦。当使用Fig作为链接容器运行 RabbitMQ 时,我可以连接到 RabbitMQ 接口,并成功配置vhosts和允许我的 Celery 工作人员进行连接,没有任何问题。

但是,重新启动 Docker 会丢失配置设置。我怎样才能保留这些设置?

我尝试过的一些事情:

  • 配置完所有内容后,从导出写入/etc/rabbitmq/rabbit.configDocker 映像。RabbitMQ 会忽略它。
  • 在我的 Docker 文件中设置主机名ENV HOSTNAME localhost,但这似乎会干扰图 1 中链接 Docker 容器。

我究竟做错了什么?是否有一个规范的 Dockerfile 用于获取配置的 RabbitMQ docker 容器以用于 Docker 链接开发目的,最好使用 Fig?

rabbitmq docker

1
推荐指数
1
解决办法
8638
查看次数

节点配置中的多个 RabbitMQ 或其他后端服务器

我从事 DevOps,我们的开发人员坚持认为,由于 Node.js 的缺陷,他们用 Node.js 编写的软件只能指向单个后端服务器,无论是数据库、Web 服务器还是 RabbitMQ 服务器。这对我来说听起来完全疯狂。这怎么可能?

local.js在文件中考虑这一点

    rabbitmq:{
        host: "rabbit01.stage",
        port: 5672,
        username : "user",
        password : "pass",
        exchangeName: "exchange_blah",
        queueName: "queue_blah",
        name : "rabbit",
        max : 10
    }
Run Code Online (Sandbox Code Playgroud)

将配置更改为

            host: ["rabbit01.stage", "rabbit02.stage" ],
Run Code Online (Sandbox Code Playgroud)

破坏应用程序并尝试在上找到兔子服务器localhost:5672

我的 Google-fu 失败了,因为我不知道如何正确地将这个问题表述为可搜索的短语。

我在这里错过了什么吗?或者我们的开发者应该RTFM?

javascript rabbitmq node.js

1
推荐指数
1
解决办法
1003
查看次数

移动应用程序中的 RabbitMQ 安全性

我在我们正在开发的一款移动应用程序中使用 Rabbit MQ 代理,我对安全方面有点困惑。我们正在使用云托管的rabbitmq,托管平台已经为我们提供了用户名和密码(此后已更改),并且我们正在使用SSL连接,因此不太担心MIM或窃听。

我担心的是任何知道主机和端口的人都可以连接到rabbitmq,因为我们有移动应用程序,我们将rabbitmq用户名和密码存储在设备上(尽管已加密),所以我猜想任何能够物理访问设备并以某种方式解密用户名密码的人可以登录rabbitmq,一旦登录,您几乎可以在rabbitmq上执行任何操作,例如删除队列等。像Rabbitmq这样的MQ如何在移动环境中使用。有没有更好/更安全的使用rabbitmq的方法。

security ssl rabbitmq

1
推荐指数
1
解决办法
2758
查看次数

RabbitMQ 设置 HA 策略

我知道 HA 策略是通过以下命令设置的:

$ rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Run Code Online (Sandbox Code Playgroud)

我的问题看起来很基本:

我是否必须在每个节点上或仅在其中一个节点上发出此命令?

rabbitmq rabbitmqctl

1
推荐指数
1
解决办法
5360
查看次数

无法使用 HTTP API 将消息发布到 RabbitMQ 中的队列

我正在使用 REST APIhttp://192.168.99.100:32787/api/exchanges/%2f/amq.direct/publish 将消息发布到我的helloworld.q队列。

有效负载:

{"properties":{},"routing_key":"","payload":"Hello World","payload_encoding":"string"}
Run Code Online (Sandbox Code Playgroud)

我还没有创建任何新的交换。我不确定要指定哪个交换,因此amq.direct在其余网址中使用。我已经提供了基本身份验证凭据,并且从 API 收到了以下响应。

{
    "routed": false
}
Run Code Online (Sandbox Code Playgroud)

不知道出了什么问题。

有关如何使用其 HTTP API 的最新文档可以在此处找到。 https://rawcdn.githack.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_11/priv/www/api/index.html

rest rabbitmq

1
推荐指数
1
解决办法
4572
查看次数

Jackson2JsonMessageConverter 不适用于rabbitmq

我使用以下代码进行消息转换器:

SimpleMessageListenerContainer container(ConnectionFactory  connectionFactory, Queue queue,
        MessageListenerAdapter listenerAdapter) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queue.getName());
    container.setMessageListener(listenerAdapter);
    container.setMessageConverter(new Jackson2JsonMessageConverter());
    return container;
}
Run Code Online (Sandbox Code Playgroud)

我的听众被宣布:

public void receiveMessage(List<Map<String, Object>> message) {
    try {
        System.out.println("Received <" + new String(message, "UTF-8") +     ">");
    } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
    }
}
Run Code Online (Sandbox Code Playgroud)

但它总是尝试给出以下错误:

Failed to invoke target method 'receiveMessage' with argument type = [class [B], value = [{[B@40c2d9c5}]","at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408)
Run Code Online (Sandbox Code Playgroud)

它似乎尝试调用 byte[] 作为参数,而不是将 json 字符串转换为 List>。

spring rabbitmq spring-amqp

1
推荐指数
1
解决办法
7001
查看次数

Rabbitmq 队列不会在服务器启动时自动创建

在其中一种环境中启动服务器时出现以下异常。但是,它在另一个环境中运行良好。

Caused by: org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:342)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:363)
        at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173)
        ... 54 more
    Caused by: org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.

    Caused by: java.io.IOException
            at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
            at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
            at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
            at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:790)
            at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:348)
            at com.sun.proxy.$Proxy89.queueDeclarePassive(Unknown Source)
            at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:216)

        Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no …
Run Code Online (Sandbox Code Playgroud)

java spring rabbitmq spring-rabbit

1
推荐指数
1
解决办法
5847
查看次数

在 Rabbitmq 和 celery 中设置 Autoack true

我正在使用celery和rabbitmq,但是由于在队列中推送了多个任务,我的服务器内存利用率变得超过40%,因此rabbit将不再接受任何任务。所以我想删除那些已经执行的消息,但是由于rabbitmq的持久行为,这些消息不会自动删除,所以我想设置一些配置,例如 autoAck=True ,这样如果从 celery 消耗消息,它将从rabbitmq 队列以及我的服务器内存。请解释一下我们该如何做到这一点。

rabbitmq celery django-celery

1
推荐指数
1
解决办法
7035
查看次数