Spring Boot和Spring AMQP RPC - 找不到转换异常的转换器

Luk*_*ley 0 spring-remoting spring-amqp spring-boot

我有几个使用Spring Boot和RPC通过RabbitMQ的教程.但是,一旦我尝试添加Jackson JSON消息转换器,它就会崩溃.

服务器已成功接收远程调用,因此我非常有信心这不是客户端配置.

Exchange    DATAFLOW_EXCHANGE
Routing Key     dataflowRunner
Redelivered     ?
Properties  
reply_to:   amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAAr0wAAAAAB.MmIZ6Htejtc1qB11G7BBQw==
priority:   0
delivery_mode:  2
headers:    
__TypeId__: org.springframework.remoting.support.RemoteInvocation
content_encoding:   UTF-8
content_type:   application/json
Payload
675 bytes
Encoding: string


{"methodName":"run","parameterTypes":["dw.dataflow.Dataflow"],"arguments":[{ Valid Dataflow JSON Removed for Brevity } ]}
Run Code Online (Sandbox Code Playgroud)

但是,输出以下异常:

Caused by: org.springframework.messaging.converter.MessageConversionException: 
No converter found to convert to class dw.dataflow.Dataflow, message=GenericMessage 
[payload=RemoteInvocation: method name 'run'; parameter types [dw.dataflow.Dataflow], headers={amqp_receivedExchange=DATAFLOW_EXCHANGE, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAArRAAAAAQC.PA/bJ6lcUfaP3csAP5v5NA==, amqp_consumerQueue=DATAFLOW_QUEUE, amqp_redelivered=false, amqp_receivedRoutingKey=dataflowRunner, amqp_contentEncoding=UTF-8, amqp_deliveryMode=PERSISTENT, id=adb37c77-c0da-16bd-8df4-b739cfddf89f, amqp_consumerTag=amq.ctag-N_tFCc_Hp9UtQkiXl7FZ8g, contentType=application/json, __TypeId__=org.springframework.remoting.support.RemoteInvocation, timestamp=1462560945203}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:118)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:98)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:112)
... 12 common frames omitted
Run Code Online (Sandbox Code Playgroud)

所以,在交付时,它知道它应该是一个dw.dataflow.Dataflow对象,它只是找不到转换器.但是,我的转换器定义在任何地方.

服务器配置

@Configuration
@EnableRabbit
public class RabbitListenerConfiguration {
    @Autowired
    ConnectionFactory connectionFactory;
    @Autowired
    ObjectMapper      jacksonObjectMapper;

@Bean
public TopicExchange exchange() {
    return new TopicExchange("DATAFLOW_EXCHANGE", true, false);
}

@Bean
public Queue queue() {
    return new Queue("DATAFLOW_QUEUE", true);
}

@Bean
public AmqpInvokerServiceExporter amqpInvokerServiceExporter() {
    AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter() ;
    exporter.setAmqpTemplate(rabbitTemplate());
    exporter.setMessageConverter(jackson2JsonMessageConverter());
    exporter.setServiceInterface(DataflowRunner.class);
    exporter.setService(dataflowRunner());
    return exporter ;
}

@Bean
public DataflowRunner dataflowRunner() {
    return new DataflowRunnerServerImpl();
}

@Bean
public MessageConverter jackson2JsonMessageConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    converter.setJsonObjectMapper(jacksonObjectMapper);
    return converter;
}

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(jackson2JsonMessageConverter());
    return template;
}


@Bean(name="rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(jackson2JsonMessageConverter());
    factory.setDefaultRequeueRejected(false); 
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

这是服务界面:

public interface DataflowRunner {
    String run(Dataflow dataflow) throws Exception;
}
Run Code Online (Sandbox Code Playgroud)

具体实施:

public class DataflowRunnerServerImpl implements DataflowRunner {
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues="DATAFLOW_QUEUE")
public String run(Dataflow dataflow) throws Exception {
    // SNIP
}
Run Code Online (Sandbox Code Playgroud)

对于grins和giggles,我还尝试使用以下注释配置服务器实现类,但它具有相同的错误:

@RabbitHandler
@RabbitListener(
        bindings = @QueueBinding(key = "dataflowRunner",
                value = @Queue(value = "DATAFLOW_QUEUE", durable = "true", autoDelete = "false", exclusive = "false"),
                exchange = @Exchange(value = "DATAFLOW_EXCHANGE", durable = "true", autoDelete = "false", type = "topic")) )
public String run(Dataflow dataflow) throws Exception {
Run Code Online (Sandbox Code Playgroud)

客户端配置

    @Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);
    connectionFactory.setUsername(rabbitUser);
    connectionFactory.setPassword(rabbitPassword);
    connectionFactory.setAddresses(rabbitAddresses);
    return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setMessageConverter(jackson2MessageConverter());
    return template;
}
Run Code Online (Sandbox Code Playgroud)

有什么似乎错误配置?我错过了什么?我在服务导出器和侦听器容器工厂上设置了转换器.

任何帮助和/或思想表示赞赏.

Gar*_*ell 5

@RabbitListener 不打算与服务导出器一起使用 - 只是一个普通的Java类.

对于通过RPC的Spring Remoting,服务导出器是MessageListener用于a的SimpleMessageListenerContainer.

@RabbitListener,有一个特殊的侦听器适配器包装pojo方法.

所以你似乎在混合两种不同的范例.

ServiceExporter(Spring远程)预计将与配对AmqpProxyFactoryBean与服务出口国在服务器端监听客户端.

对于简单的POJO RPC(这比使用Spring Remoting的超过RabbitMQ的非常新的),使用@RabbitListenerRabbitTemplate.convertSendAndReceive()客户端上.摆脱PFB和SE.

如果我们需要在文档中添加一些说明,你能否解释是什么导致你走这条路?

编辑

如果你确实想使用Spring Remoting(在客户端注入一个接口并让它"神奇地"调用服务器端的服务),你需要摆脱所有容器工厂的东西,然后简单地连接SimpleMessageListenerContainer并注入服务出口商作为MessageListener.

参考手册有一个XML示例,但您可以将SMLC连接为@Bean.

EDIT2

我已经运行了一些测试并且Spring Remoting over AMQP不能与JSON一起工作,因为顶级对象是RemoteInvocation- 而消息转换器可以重新创建该对象,它没有关于实际参数的类型信息所以将它保留为链接哈希映射.

现在,如果你必须使用JSON,那么convertSendAndReceive结合使用模板@RabbitListener就是这里的方法.我将打开一个JIRA问题,看看我们是否可以使用带有JSON的Spring Remoting RPC解决,但它确实是为Java序列化而设计的.