Luk*_*ley 0 spring-remoting spring-amqp spring-boot
我有几个使用Spring Boot和RPC通过RabbitMQ的教程.但是,一旦我尝试添加Jackson JSON消息转换器,它就会崩溃.
服务器已成功接收远程调用,因此我非常有信心这不是客户端配置.
Exchange DATAFLOW_EXCHANGE
Routing Key dataflowRunner
Redelivered ?
Properties
reply_to: amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAAr0wAAAAAB.MmIZ6Htejtc1qB11G7BBQw==
priority: 0
delivery_mode: 2
headers:
__TypeId__: org.springframework.remoting.support.RemoteInvocation
content_encoding: UTF-8
content_type: application/json
Payload
675 bytes
Encoding: string
{"methodName":"run","parameterTypes":["dw.dataflow.Dataflow"],"arguments":[{ Valid Dataflow JSON Removed for Brevity } ]}
Run Code Online (Sandbox Code Playgroud)
但是,输出以下异常:
Caused by: org.springframework.messaging.converter.MessageConversionException:
No converter found to convert to class dw.dataflow.Dataflow, message=GenericMessage
[payload=RemoteInvocation: method name 'run'; parameter types [dw.dataflow.Dataflow], headers={amqp_receivedExchange=DATAFLOW_EXCHANGE, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAArRAAAAAQC.PA/bJ6lcUfaP3csAP5v5NA==, amqp_consumerQueue=DATAFLOW_QUEUE, amqp_redelivered=false, amqp_receivedRoutingKey=dataflowRunner, amqp_contentEncoding=UTF-8, amqp_deliveryMode=PERSISTENT, id=adb37c77-c0da-16bd-8df4-b739cfddf89f, amqp_consumerTag=amq.ctag-N_tFCc_Hp9UtQkiXl7FZ8g, contentType=application/json, __TypeId__=org.springframework.remoting.support.RemoteInvocation, timestamp=1462560945203}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:118)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:98)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:112)
... 12 common frames omitted
Run Code Online (Sandbox Code Playgroud)
所以,在交付时,它知道它应该是一个dw.dataflow.Dataflow对象,它只是找不到转换器.但是,我的转换器定义在任何地方.
服务器配置
@Configuration
@EnableRabbit
public class RabbitListenerConfiguration {
@Autowired
ConnectionFactory connectionFactory;
@Autowired
ObjectMapper jacksonObjectMapper;
@Bean
public TopicExchange exchange() {
return new TopicExchange("DATAFLOW_EXCHANGE", true, false);
}
@Bean
public Queue queue() {
return new Queue("DATAFLOW_QUEUE", true);
}
@Bean
public AmqpInvokerServiceExporter amqpInvokerServiceExporter() {
AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter() ;
exporter.setAmqpTemplate(rabbitTemplate());
exporter.setMessageConverter(jackson2JsonMessageConverter());
exporter.setServiceInterface(DataflowRunner.class);
exporter.setService(dataflowRunner());
return exporter ;
}
@Bean
public DataflowRunner dataflowRunner() {
return new DataflowRunnerServerImpl();
}
@Bean
public MessageConverter jackson2JsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setJsonObjectMapper(jacksonObjectMapper);
return converter;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jackson2JsonMessageConverter());
return template;
}
@Bean(name="rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setDefaultRequeueRejected(false);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
这是服务界面:
public interface DataflowRunner {
String run(Dataflow dataflow) throws Exception;
}
Run Code Online (Sandbox Code Playgroud)
具体实施:
public class DataflowRunnerServerImpl implements DataflowRunner {
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues="DATAFLOW_QUEUE")
public String run(Dataflow dataflow) throws Exception {
// SNIP
}
Run Code Online (Sandbox Code Playgroud)
对于grins和giggles,我还尝试使用以下注释配置服务器实现类,但它具有相同的错误:
@RabbitHandler
@RabbitListener(
bindings = @QueueBinding(key = "dataflowRunner",
value = @Queue(value = "DATAFLOW_QUEUE", durable = "true", autoDelete = "false", exclusive = "false"),
exchange = @Exchange(value = "DATAFLOW_EXCHANGE", durable = "true", autoDelete = "false", type = "topic")) )
public String run(Dataflow dataflow) throws Exception {
Run Code Online (Sandbox Code Playgroud)
客户端配置
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);
connectionFactory.setUsername(rabbitUser);
connectionFactory.setPassword(rabbitPassword);
connectionFactory.setAddresses(rabbitAddresses);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jackson2MessageConverter());
return template;
}
Run Code Online (Sandbox Code Playgroud)
有什么似乎错误配置?我错过了什么?我在服务导出器和侦听器容器工厂上设置了转换器.
任何帮助和/或思想表示赞赏.
@RabbitListener 不打算与服务导出器一起使用 - 只是一个普通的Java类.
对于通过RPC的Spring Remoting,服务导出器是MessageListener用于a的SimpleMessageListenerContainer.
有@RabbitListener,有一个特殊的侦听器适配器包装pojo方法.
所以你似乎在混合两种不同的范例.
该ServiceExporter(Spring远程)预计将与配对AmqpProxyFactoryBean与服务出口国在服务器端监听客户端.
对于简单的POJO RPC(这比使用Spring Remoting的超过RabbitMQ的非常新的),使用@RabbitListener和RabbitTemplate.convertSendAndReceive()客户端上.摆脱PFB和SE.
如果我们需要在文档中添加一些说明,你能否解释是什么导致你走这条路?
编辑
如果你确实想使用Spring Remoting(在客户端注入一个接口并让它"神奇地"调用服务器端的服务),你需要摆脱所有容器工厂的东西,然后简单地连接SimpleMessageListenerContainer并注入服务出口商作为MessageListener.
参考手册有一个XML示例,但您可以将SMLC连接为@Bean.
EDIT2
我已经运行了一些测试并且Spring Remoting over AMQP不能与JSON一起工作,因为顶级对象是RemoteInvocation- 而消息转换器可以重新创建该对象,它没有关于实际参数的类型信息所以将它保留为链接哈希映射.
现在,如果你必须使用JSON,那么convertSendAndReceive结合使用模板@RabbitListener就是这里的方法.我将打开一个JIRA问题,看看我们是否可以使用带有JSON的Spring Remoting RPC解决,但它确实是为Java序列化而设计的.
| 归档时间: |
|
| 查看次数: |
3121 次 |
| 最近记录: |