Spring AMQP-发送者和接收消息

BIn*_*hav 8 rabbitmq spring-amqp spring-rabbitmq

我在从RabbitMQ接收消息时遇到问题。我正在发送如下消息

        HashMap<Object, Object> senderMap=new HashMap<>();
        senderMap.put("STATUS", "SUCCESS");
        senderMap.put("EXECUTION_START_TIME", new Date());

        rabbitTemplate.convertAndSend(Constants.ADAPTOR_OP_QUEUE,senderMap);
Run Code Online (Sandbox Code Playgroud)

如果在RabbitMQ中看到,我们将获得完全合格的类型。

在当前情况下,同一消费者有n个生产者。如果我使用任何映射器,则会导致异常。我将如何发送消息,使其不包含任何type_id,并且可以将消息作为Message对象接收,以后可以将其绑定到接收器中的自定义对象。

我收到如下消息。您能否让我知道如何使用Jackson2MessageConverter,以便消息将直接从Receiver端绑定到我的Object / HashMap。我也已经从发件人中删除了Type_ID。

消息在RabbitMQ中的外观

优先级:0 delivery_mode:2个标头:
ContentTypeId:java.lang.Object KeyTypeId:java.lang.Object content_encoding:UTF-8 content_type:application / json {“ Execution_start_time”:1473747183636,“ status”:“ SUCCESS”}

@Component
public class AdapterOutputHandler {

    private static Logger logger = Logger.getLogger(AdapterOutputHandler.class);

    @RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE)
    public void handleAdapterQueueMessage(HashMap<String,Object> message){

        System.out.println("Receiver:::::::::::"+message.toString());

    }

}
Run Code Online (Sandbox Code Playgroud)

连接

@Bean(name="adapterOPListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        DefaultClassMapper classMapper = new DefaultClassMapper();
        messageConverter.setClassMapper(classMapper);
        factory.setMessageConverter(messageConverter);

    }
Run Code Online (Sandbox Code Playgroud)

例外

Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert Message content. Could not resolve __TypeId__ in header and no defaultType provided
    at org.springframework.amqp.support.converter.DefaultClassMapper.toClass(DefaultClassMapper.java:139)
Run Code Online (Sandbox Code Playgroud)

我不想使用发件人的__TYPE__ID,因为它们是同一队列的多个发件人,并且只有一个消费者。

Gar*_*ell 6

它导致异常

什么异常?

TypeId : com.diff.approach.JobListenerDTO

这意味着您发送的是 DTO,而不是您在问题中描述的哈希映射。

如果要删除 typeId 标头,可以使用消息后处理器...

rabbitTemplate.convertAndSend(Constants.INPUT_QUEUE, dto, m -> {
    m.getMessageProperties.getHeaders().remove("__TypeId__");
    return m;
});
Run Code Online (Sandbox Code Playgroud)

(或者, new MessagePostProcessor() {...}如果您没有使用 Java 8)。

编辑

您使用的是哪个版本的 Spring AMQP?使用 1.6,您甚至不必删除__TypeId__标题 - 框架查看侦听器参数类型并告诉 Jackson 转换器类型,以便它自动转换为该类型(如果可以的话)。正如你在这里看到的;它在不删除类型 ID 的情况下工作正常......

package com.example;

import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class So39443850Application {

    private static final String QUEUE = "so39443850";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39443850Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend(QUEUE, new DTO("baz", "qux"));
        context.getBean(So39443850Application.class).latch.await(10, TimeUnit.SECONDS);
        context.getBean(RabbitAdmin.class).deleteQueue(QUEUE);
        context.close();
    }

    private final CountDownLatch latch = new CountDownLatch(1);

    @RabbitListener(queues = QUEUE, containerFactory = "adapterOPListenerContainerFactory")
    public void listen(HashMap<String, Object> message) {
        System.out.println(message.getClass() + ":" + message);
        latch.countDown();
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

    public static class DTO {

        private String foo;

        private String baz;

        public DTO(String foo, String baz) {
            this.foo = foo;
            this.baz = baz;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        public String getBaz() {
            return this.baz;
        }

        public void setBaz(String baz) {
            this.baz = baz;
        }

    }

}
Run Code Online (Sandbox Code Playgroud)

结果:

class java.util.HashMap:{foo=baz, baz=qux}
Run Code Online (Sandbox Code Playgroud)

这在文档中描述...

在 1.6 之前的版本中,必须在消息头中提供用于转换 JSON 的类型信息,或者需要自定义 ClassMapper。从 1.6 版开始,如果没有类型信息头,则可以从目标方法参数推断类型。

您还可以将自定义配置ClassMapper为始终返回HashMap