标签: rabbitmq

Spring AMQP 接收器的参数类型错误

我的 Spring AMQP 应用程序在启动时记录了以下异常:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'receiveMessage' with argument type = [class [B], value = [{[B@660cff44}]
Run Code Online (Sandbox Code Playgroud)

从我的搜索中我了解到这是因为类与消息类型不兼容?但是,我看不到这是哪里。

以下是相关的代码段:

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

@Bean
Queue queue() {
    return new Queue(config.getAMQPResultsQueue(), false);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange(config.getAMQPResultsExchange());
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with("#");
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(config.getAMQPResultsQueue());
    container.setMessageListener(listenerAdapter);
    container.setMessageConverter(jsonMessageConverter());
    return container;
}

@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) …
Run Code Online (Sandbox Code Playgroud)

spring rabbitmq spring-amqp

1
推荐指数
1
解决办法
2813
查看次数

如果每条消息都将路由到主节点,为什么要在一组 RabbitMQ 节点(同一队列)之前放置负载均衡器?

根据https://groups.google.com/forum/#!topic/rabbitmq-users/vvWAymjDww4,如果队列跨多个服务器(节点)镜像,发布者写入哪个队列并不重要 - RabbitMQ 将始终将消息转发到主节点(首次创建队列的节点)。

如果是这种情况,如果每条消息最终都会路由到同一个节点,那么将负载均衡器放在节点前面有什么意义?似乎主节点将始终承担整个负载。

rabbitmq

1
推荐指数
1
解决办法
1639
查看次数

rabbitmq-server 无法启动

我安装了 RabbitMQ,它运行良好。然后,一旦我的系统内存已满并且它停止工作。我清除了系统内存并再次启动了RabbitMQ,但它没有启动。以下是我得到的状态。

root@qa-development-vm:/usr/# systemctl status rabbitmq-server.service
? rabbitmq-server.service - RabbitMQ broker
   Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled)
   Active: activating (start) since Thu 2018-12-06 06:13:31 UTC; 641ms ago
  Process: 32243 ExecStop=/bin/sh -c while ps -p $MAINPID >/dev/null 2>&1; do sleep 1; done (code=exited, status=0/SUCCESS)
  Process: 32105 ExecStop=/usr/lib/rabbitmq/bin/rabbitmqctl stop (code=exited, status=0/SUCCESS)
 Main PID: 32253 (beam.smp)
   CGroup: /system.slice/rabbitmq-server.service
           ??32253 /usr/lib/erlang/erts-10.1.3/bin/beam.smp -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -- -root /usr/lib/erlang -progname erl -- -home ...
           ??32465 erl_child_setup 1024 …
Run Code Online (Sandbox Code Playgroud)

erlang rabbitmq

1
推荐指数
1
解决办法
5869
查看次数

对于 prefetchcount > 1 的每条消息,MassTransit 新消费者(和 di 范围)

我正在尝试MassTransit.ExtensionsDependencyInjectionIntegration使用 PrefetchCount进行设置和试验。我注意到对于多条消息,有时会使用同一个消费者。

有没有办法强制大众运输为每条消息创建消费者?

我的服务逻辑需要为每条消息设置新的 di 范围,据我所知,只有我们每次都创建消费者才有可能。

UPD

我正在使用这个库来注册消费者和发布者。

在这里您可以看到,它使用TryAddTrancient(). 这可能是个问题吗?我仍然认为 di 容器应该为每个请求生成消费者。

似乎这不是故意的行为,我会更深入地研究我的问题。

c# dependency-injection masstransit rabbitmq .net-core

1
推荐指数
1
解决办法
1795
查看次数

Golang Docker 容器未在 Docker-Compose 中重新启动

我希望能够在无法连接到 rabbitmq 时重新启动 golang docker 文件,如下所述:(Docker Compose 在启动 Y 之前等待容器 X,请参阅:svenhornberg 的回答)。不幸的是,我的 golang 容器将退出但永远不会重新启动,我不知道为什么。

Docker-撰写:

version: '3.3'
services:
  mongo:
    image: 'mongo:3.4.1'
    container_name: 'datastore'
    ports:
      - '27017:27017'
  rabbitmq:
    restart: always
    tty: true
    image: rabbitmq:3.7-management-alpine
    hostname: "rabbit"
    ports:
      - "15672:15672" 
      - "5672:5672"
    labels:
      NAME: "rabbitmq"
    volumes:
      - ./rabbitmq-isolated.conf:/etc/rabbitmq/rabbitmq.config
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:15672"]
      interval: 3s
      timeout: 5s
      retries: 20
  api:
    restart: always
    tty: true
    container_name: 'api'
    build: '.'
    working_dir: /go/src/github.com/patientplatypus/project
    ports:
      - '8000:8000'
    volumes:
      - './:/go/src/github.com/patientplatypus/project'
      - './uploads:/uploads'
      - …
Run Code Online (Sandbox Code Playgroud)

restart go rabbitmq docker docker-compose

1
推荐指数
1
解决办法
1041
查看次数

在通过自制软件安装的 macos 上访问 Rabbit MQ UI 时出错

我通过以下方式在运行 OS X 的本地机器上安装了 Rabbit MQ:

brew install rabbitmq

我通过以下方式启动和停止服务器:

brew services start/stop rabbitmq

我通过以下方式安装了 Web UI 插件:

rabbitmq-plugins enable rabbitmq_management

我希望在浏览器中查看 Web UI:

http://localhost:15672/

它不加载。报告的错误是 431,这意味着请求头字段太大。 https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/431

macos rabbitmq rabbitmq-management macos-mojave

1
推荐指数
1
解决办法
597
查看次数

如何使用 amqp 库在“设置结构”之外重用 RabbitMQ 连接和通道?

我正在尝试使用 amqp 库构建一个简单的 node.js 客户端,它打开一个连接,然后打开一个到 RabbitMQ 服务器的通道。我想重用相同的连接和通道来发送多条消息。主要问题是,我不想在 ceateChannel() 函数的回调函数中编写我的整个代码。

如何在回调函数之外重用通道并确保在使用通道之前回调函数已完成?

我已经尝试了回调方式和承诺方式,但我无法让它们中的任何一个工作。使用回调方法时,我遇到了所描述的问题。

使用 promise 时,我遇到的问题是我无法在 .then() 函数之外保留连接和通道的引用,因为在设置连接和通道后传递的变量会被破坏。


amqp.connect('amqp://localhost', (err, conn) => {

  if (err !== null) return console.warn(err);
  console.log('Created connection!');

  conn.createChannel((err, ch) => {

    if (err !== null) return console.warn(err);
    console.log('Created channel!');

    //this is where I would need to write the code that uses the variable "ch"
    //but I want to move the code outside of this structure, while making sure
    //this callback completes before I try using "ch"

  });
}); …
Run Code Online (Sandbox Code Playgroud)

javascript amqp rabbitmq node.js promise

1
推荐指数
1
解决办法
2089
查看次数

如何使用 pika 和 rabbitmq 的多线程来执行请求和响应 RPC 消息

我正在使用 Rabbitmq 进行一个项目,我正在使用 RPC 模式,基本上我正在接收或使用来自队列的消息,进行一些处理,然后发送回响应。我使用 Pika,我的目标是为每个任务使用一个线程,因此对于每个任务,我将专门为该任务创建一个线程。我还读到,最佳实践是只建立一个连接,并根据需要在其下建立多个通道,但我总是收到此错误:
“可能无法从 pika.exceptions.RecursionError 的范围内调用 start_sumption”:start_sumption 可能不会从另一个 BlockingConnection 或 BlockingChannel 回调的范围内调用。

我做了一些研究,发现 Pika 不是线程安全的,我们应该为每个线程使用一个独立的连接和一个通道。但我不想这样做,因为它被认为是不好的做法。所以我想在这里问一下是否有人已经实现了这项工作。我还读到,如果我不使用 BlockingConnection 来实例化我的 Connection 并且有一个名为 add_callback_threadsafe 的函数可以使这成为可能。但不幸的是没有示例,我阅读了文档,但它很复杂,如果没有示例,我很难理解他们想要描述的内容。

我的尝试是声明两个类。每个类将代表一个任务执行器,它接收或使用来自队列的消息,并基于此进行一些处理并返回响应。我的想法是在两个任务之间共享一个rabbitmq 连接,但每个任务都会获得一个独立的通道。在上面的代码中,传递给函数的 rabbit 参数是一个类,其中包含一些变量,如 Connection 和其他函数,如 EventSubscriber,当调用它时,它将分配一个新通道并开始使用来自该特定交换和路由密钥的消息。接下来我声明一个线程并将订阅或消费函数作为该线程的目标。另一个任务类看起来也和这个类一样,这就是为什么我只上传这个代码。

On_Deregistration 类:

def __init__(self, rabbit):
   self.event(rabbit) # this will call event function and pass the connection shared between all Tasks. rabbit parameter hold a connection to rabbitmq

def event(self, rabbit):
 
    self.Subscriber = rabbit.EventSubscriber(rabbit,  'testing.test',  'test', False,  onDeregistrationFromHRS # this func is task listener)

def subscribeAsync(self):
    self.Subscriber.subscribe() # here i …
Run Code Online (Sandbox Code Playgroud)

python multithreading rabbitmq pika

1
推荐指数
1
解决办法
1464
查看次数

Java Spring + RabbitMQ + Docker 的奇怪问题

我尝试使用我上面写的技术运行简单的测试项目,但遇到奇怪的异常并且找不到解决方案。我使用 Windows 7 并且 docker 在主机上工作:192.168.99.100。rabbitmq 的管理面板通过地址http://192.168.99.100:15672,运行良好。

应用特性:

spring.rabbitmq.host=192.168.99.100
spring.rabbitmq.port=15672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.queue=jsa.queue
Run Code Online (Sandbox Code Playgroud)

成分:

@Component
public class Consumer {

    @RabbitListener(queues="${jsa.rabbitmq.queue}")
    public void recievedMessage(String msg) {
        System.out.println("Recieved Message: " + msg);
    }
}
Run Code Online (Sandbox Code Playgroud)

堆栈跟踪:

2019-09-14 16:00:54.677 ERROR 4532 --- [68.99.100:15672] c.r.c.impl.ForgivingExceptionHandler     : An unexpected connection driver error occured

    java.net.SocketException: Socket Closed
        at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_201]
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_201]
        at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_201]
        at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_201]
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[na:1.8.0_201]
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265) ~[na:1.8.0_201]
        at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) ~[na:1.8.0_201]
        at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-4.0.2.jar:4.0.2]
        at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164) ~[amqp-client-4.0.2.jar:4.0.2] …
Run Code Online (Sandbox Code Playgroud)

java rabbitmq docker spring-boot

1
推荐指数
1
解决办法
476
查看次数

如何使用 LDAP 为 AD 组配置 RabbitMQ?

我是 LDAP 新手。

到目前为止,我已经成功地使用 LDAP 配置了 rabbitMQ 并对其进行了身份验证,如果它是针对单个 AD 的。我正在使用以下配置:

RabbitMQ 配置文件,

auth_backends,[{rabbit_auth_backend_ldap, rabbit_auth_backend_internal},rabbit_auth_backend_internal]
Run Code Online (Sandbox Code Playgroud)

在 RabbitMQ 管理中,我手动创建了一个没有设置密码的用户名(它有效)

但是,让我们说

我有一个 AD 组(称为“Rabbit 用户组”),里面有 3 个用户(User1、User2、User3)。

“Rabbit User Group”的位置在:sample.companyname.com > City Name (OU) > Groups (OU) > IT Groups (OU) > “Rabbit User Group”(安全组)。

我可以知道我应该如何在 RabbitMQ 管理和配置文件中配置它,以便一旦我更新特定组,组内的所有成员都将进行身份验证并具有相同的权限(例如,只有该组具有管理员权限) )?而且我不需要在rabbitMQ管理中手动创建每个单独的用户进行身份验证?。

我已将以下内容添加到我的 rabbitmq 配置文件中

{
            tag_queries, [
                            {administrator,{in_group,'CN="Rabbit User Group",OU="City Name", OU=Groups, OU="IT Group",DC=sample,DC=companyname,DC=com',"uniqueMember"}},
                            {management,    {constant, true}}
                         ]
        }
Run Code Online (Sandbox Code Playgroud)

并尝试在没有密码的情况下在rabbitMQ管理中创建一个名为“Rabbit User Group”的用户名。但是当我尝试以“User1”身份登录时,却无法登录。

我尝试了几种方法,但我不确定我遗漏了哪一部分,而且它们似乎都不起作用。

这是我的整体配置文件。

[
 {
  rabbit,
  [
   {
     auth_backends,[{rabbit_auth_backend_ldap, rabbit_auth_backend_internal},rabbit_auth_backend_internal]
   }
  ]
 },
 { …
Run Code Online (Sandbox Code Playgroud)

ldap rabbitmq

1
推荐指数
1
解决办法
1812
查看次数