RabbitMQ 无法声明队列并且侦听器无法在服务器上获取队列

Adi*_*ote 5 message-queue rabbitmq spring-rabbit spring-amqp spring-boot

我有spring boot rabbitmq应用程序,我必须将 Employee 对象发送到队列中。然后我设置了一个监听器应用程序。对员工对象进行一些处理并将此对象放入回调队列中。

为此,我在我的应用程序中创建了以下对象。

  1. 创建了ConnectionFactory
  2. 使用ConnectionFactory创建了RabbitAdmin对象。
  3. 请求队列。
  4. 回调队列。
  5. 直接交换。
  6. 请求队列绑定。
  7. 回调队列绑定。
  8. 消息转换器。
  9. RabbitTemplate 对象。
  10. 最后是SimpleMessageListenerContainer 的对象。

我的应用程序文件如下所示。

应用程序属性

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=foo
emp.rabbitmq.directexchange=EMP_EXCHANGE1
emp.rabbitmq.requestqueue=EMP_QUEUE1
emp.rabbitmq.routingkey=EMP_ROUTING_KEY1
Run Code Online (Sandbox Code Playgroud)

主类.java

package com.employee;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MainClass {

    public static void main(String[] args) {
        SpringApplication.run(
                MainClass.class, args);
    }
}
Run Code Online (Sandbox Code Playgroud)

应用程序上下文提供程序

package com.employee.config;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;

public class ApplicationContextProvider implements ApplicationContextAware {
    private static ApplicationContext context;

    public ApplicationContext getApplicationContext(){
        return context;
    }

    @Override
    public void setApplicationContext(ApplicationContext arg0) throws BeansException {
        context = arg0;

    }

    public Object getBean(String name){
        return context.getBean(name, Object.class);
    }

    public void addBean(String beanName, Object beanObject){
        ConfigurableListableBeanFactory beanFactory = ((ConfigurableApplicationContext)context).getBeanFactory();
        beanFactory.registerSingleton(beanName, beanObject);
    }

    public void removeBean(String beanName){
        BeanDefinitionRegistry reg = (BeanDefinitionRegistry) context.getAutowireCapableBeanFactory();
        reg.removeBeanDefinition(beanName);
    }
}
Run Code Online (Sandbox Code Playgroud)

常量.java

package com.employee.constant;

public class Constants {

    public static final String CALLBACKQUEUE = "_CBQ";

}
Run Code Online (Sandbox Code Playgroud)

雇员.java

package com.employee.model;

import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;

@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = Employee.class)
public class Employee {

    private String empName;
    private String empId;
    private String changedValue;
    public String getEmpName() {
        return empName;
    }
    public void setEmpName(String empName) {
        this.empName = empName;
    }
    public String getEmpId() {
        return empId;
    }
    public void setEmpId(String empId) {
        this.empId = empId;
    }
    public String getChangedValue() {
        return changedValue;
    }
    public void setChangedValue(String changedValue) {
        this.changedValue = changedValue;
    }


}
Run Code Online (Sandbox Code Playgroud)

EmployeeProducerInitializer.java

package com.employee.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import com.employee.constant.Constants;
import com.employee.service.EmployeeResponseReceiver;

@Configuration
@EnableAutoConfiguration
@ComponentScan(value="com.en.*")
public class EmployeeProducerInitializer {

    @Value("${emp.rabbitmq.requestqueue}")
    String requestQueueName;

    @Value("${emp.rabbitmq.directexchange}")
    String directExchange;

    @Value("${emp.rabbitmq.routingkey}")
    private String requestRoutingKey;

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    ApplicationContextProvider applicationContextProvider(){
        System.out.println("inside app ctx provider");
        return new ApplicationContextProvider();
    };

    @Bean
    RabbitAdmin rabbitAdmin(){
        System.out.println("inside rabbit admin");
        return new RabbitAdmin(rabbitConnectionFactory);
    };

    @Bean
    Queue empRequestQueue() {
        System.out.println("inside request queue");
        return new Queue(requestQueueName, true);
    }

    @Bean
    Queue empCallBackQueue() {
        System.out.println("inside call back queue");
        return new Queue(requestQueueName + Constants.CALLBACKQUEUE, true);
    }

    @Bean
    DirectExchange empDirectExchange() {
        System.out.println("inside exchange");
        return new DirectExchange(directExchange);
    }

    @Bean
    Binding empRequestBinding() {
        System.out.println("inside request binding");
        return BindingBuilder.bind(empRequestQueue()).to(empDirectExchange()).with(requestRoutingKey);
    }

    @Bean
    Binding empCallBackBinding() {
        return BindingBuilder.bind(empCallBackQueue()).to(empDirectExchange()).with(requestRoutingKey + Constants.CALLBACKQUEUE);
    }

    @Bean
    public MessageConverter jsonMessageConverter(){
        System.out.println("inside json msg converter");
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate empFixedReplyQRabbitTemplate() {
        System.out.println("inside rabbit template");
        RabbitTemplate template = new RabbitTemplate(this.rabbitConnectionFactory);
        template.setExchange(empDirectExchange().getName());
        template.setRoutingKey(requestRoutingKey);
        template.setMessageConverter(jsonMessageConverter());
        template.setReceiveTimeout(100000);
        template.setReplyTimeout(100000);

        return template;
    }

    @Bean
    public SimpleMessageListenerContainer empReplyListenerContainer() {
        System.out.println("inside listener");
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        try{
            container.setConnectionFactory(this.rabbitConnectionFactory);
            container.setQueues(empCallBackQueue());
            container.setMessageListener(new EmployeeResponseReceiver());
            container.setMessageConverter(jsonMessageConverter());
            container.setConcurrentConsumers(10);
            container.setMaxConcurrentConsumers(20);
            container.start();
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            System.out.println("inside listener finally");
        }

        return container;
    }

    @Autowired
    @Qualifier("empReplyListenerContainer")
    private SimpleMessageListenerContainer empReplyListenerContainer;
}
Run Code Online (Sandbox Code Playgroud)

员工响应接收器.java

package com.employee.service;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.stereotype.Component;

import com.employee.config.ApplicationContextProvider;
import com.employee.model.Employee;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;

@Component
@EnableAutoConfiguration
public class EmployeeResponseReceiver implements ChannelAwareMessageListener {

    ApplicationContextProvider applicationContextProvider = new ApplicationContextProvider();

    String msg = null;
    ObjectMapper mapper = new ObjectMapper();
    Employee employee = null;

    @Override
    public void onMessage(Message message, Channel arg1) throws Exception {
        try {
            msg = new String(message.getBody());
            System.out.println("Received Message : " + msg);

            employee = mapper.readValue(msg, Employee.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

问题是每当我启动我的应用程序时,我都会遇到以下异常。

2018-03-17 14:18:36.695  INFO 12472 --- [ost-startStop-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2018-03-17 14:18:36.696  INFO 12472 --- [ost-startStop-1] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 5060 ms
2018-03-17 14:18:37.004  INFO 12472 --- [ost-startStop-1] o.s.b.w.servlet.ServletRegistrationBean  : Mapping servlet: 'dispatcherServlet' to [/]
2018-03-17 14:18:37.010  INFO 12472 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean   : Mapping filter: 'characterEncodingFilter' to: [/*]
2018-03-17 14:18:37.010  INFO 12472 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean   : Mapping filter: 'hiddenHttpMethodFilter' to: [/*]
2018-03-17 14:18:37.011  INFO 12472 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean   : Mapping filter: 'httpPutFormContentFilter' to: [/*]
2018-03-17 14:18:37.011  INFO 12472 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean   : Mapping filter: 'requestContextFilter' to: [/*]
inside listener
inside call back queue
inside json msg converter
2018-03-17 14:18:37.576  INFO 12472 --- [cTaskExecutor-8] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SimpleConnection@3d31af39 [delegate=amqp://guest@127.0.0.1:5672/foo, localPort= 50624]
2018-03-17 14:18:37.654  WARN 12472 --- [cTaskExecutor-7] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:EMP_QUEUE1_CBQ
2018-03-17 14:18:37.655  WARN 12472 --- [cTaskExecutor-6] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:EMP_QUEUE1_CBQ
2018-03-17 14:18:37.655  WARN 12472 --- [cTaskExecutor-5] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:EMP_QUEUE1_CBQ
2018-03-17 14:18:37.655  WARN 12472 --- [cTaskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:EMP_QUEUE1_CBQ
2018-03-17 14:18:37.657  WARN 12472 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:EMP_QUEUE1_CBQ
2018-03-17 14:18:37.658  WARN 12472 --- [cTaskExecutor-8] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:EMP_QUEUE1_CBQ
2018-03-17 14:18:37.661  WARN 12472 --- [cTaskExecutor-2] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:EMP_QUEUE1_CBQ
2018-03-17 14:18:37.660  WARN 12472 --- [cTaskExecutor-4] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:EMP_QUEUE1_CBQ
2018-03-17 14:18:37.661  WARN 12472 --- [cTaskExecutor-9] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:EMP_QUEUE1_CBQ
2018-03-17 14:18:37.666  WARN 12472 --- [TaskExecutor-10] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:EMP_QUEUE1_CBQ
2018-03-17 14:18:37.667  WARN 12472 --- [cTaskExecutor-2] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[EMP_QUEUE1_CBQ]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:636) ~[spring-rabbit-1.7.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:535) ~[spring-rabbit-1.7.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1389) [spring-rabbit-1.7.2.RELEASE.jar:na]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_151]
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:992) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:50) ~[amqp-client-4.0.2.jar:4.0.2]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_151]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_151]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_151]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:955) ~[spring-rabbit-1.7.2.RELEASE.jar:na]
    at com.sun.proxy.$Proxy58.queueDeclarePassive(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:615) ~[spring-rabbit-1.7.2.RELEASE.jar:na]
    ... 3 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'EMP_QUEUE1_CBQ' in vhost 'foo', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.2.jar:4.0.2]
    ... 12 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'EMP_QUEUE1_CBQ' in vhost 'foo', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:505) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:336) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:143) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:90) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572) ~[amqp-client-4.0.2.jar:4.0.2]
    ... 1 common frames omitted

2018-03-17 14:08:36.689  WARN 11076 --- [cTaskExecutor-4] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:EMP_QUEUE1_CBQ
2018-03-17 14:08:36.695 ERROR 11076 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal exception on startup

org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:563) ~[spring-rabbit-1.7.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1389) ~[spring-rabbit-1.7.2.RELEASE.jar:na]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_151]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[EMP_QUEUE1_CBQ]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:636) ~[spring-rabbit-1.7.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:535) ~[spring-rabbit-1.7.2.RELEASE.jar:na]
    ... 2 common frames omitted
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:992) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:50) ~[amqp-client-4.0.2.jar:4.0.2]
    at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_151]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_151]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:955) ~[spring-rabbit-1.7.2.RELEASE.jar:na]
    at com.sun.proxy.$Proxy58.queueDeclarePassive(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:615) ~[spring-rabbit-1.7.2.RELEASE.jar:na]
    ... 3 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'EMP_QUEUE1_CBQ' in vhost 'foo', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.2.jar:4.0.2]
    ... 11 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'EMP_QUEUE1_CBQ' in vhost 'foo', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:505) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:336) ~[amqp-client-4.0.2.jar:4.0.2]
    at com.rabbitmq.client.impl.

Gar*_*ell 5

当我在默认虚拟主机中声明队列时,这个问题不存在。但是当我添加虚拟主机 foo 时,它突然停止工作。

访问新虚拟主机的用户是否有configure权限?声明队列需要配置权限。

需要 RabbitAdmin 来声明队列/绑定;容器只做一个被动声明来检查队列是否存在。

编辑

container.start();
Run Code Online (Sandbox Code Playgroud)

您不能start()是 bean 定义中的容器。如果容器autoStartUptrue(默认),则应用程序上下文将在应用程序上下文完全构建后执行此操作。

从您的日志中可以清楚地看出,容器启动得太早了 - 在其他 bean(管理、队列等)被声明之前。