Och*_*nga 6 java spring rabbitmq spring-amqp spring-boot
最近,我对使用Spring Boot的微服务架构产生了浓厚的兴趣.我的实现有两个Spring启动应用程序;
应用的一个接收请求从一个RESTful API,转换并发送有效载荷JSON到的RabbitMQ queueA.
应用程序二,已订阅queueA,接收jSON有效负载(域对象用户),并且应该激活Application Two中的服务,例如.向用户发送电子邮件.
在我的Application Two配置中不使用XML ,如何配置将从RabbitMQ接收的jSON有效负载转换为域对象用户的转换器.
以下是Application Two上Spring Boot配置的摘录
Application.class
@SpringBootApplication
@EnableRabbit
public class ApplicationInitializer implements CommandLineRunner {
final static String queueName = "user-registration";
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
AnnotationConfigApplicationContext context;
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange("user-registrations");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
public static void main(String[] args) {
SpringApplication.run(ApplicationInitializer.class, args);
}
@Override
public void run(String... args) throws Exception {
System.out.println("Waiting for messages...");
}
}
Run Code Online (Sandbox Code Playgroud)
TestService.java
@Component
public class TestService {
/**
* This test verifies whether this consumer receives message off the user-registration queue
*/
@RabbitListener(queues = "user-registration")
public void testReceiveNewUserNotificationMessage(User user) {
// do something like, convert payload to domain object user and send email to this user
}
}
Run Code Online (Sandbox Code Playgroud)
jon*_*ckt 11
我遇到了同样的问题,经过一些研究和测试,我了解到,在SpringBoot中配置RabbitMQ-Receiver的方法不止一种,但选择一个并坚持下去非常重要.
如果您决定使用Annotation Driven Listener Endpoint,那么我从@EnableRabbit和@RabbitListener的使用中得到的结果,与您发布的配置相比并不适用于我.有效的是以下内容:
从org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer派生您的配置类,并重写方法configureRabbitListeners,如下所示:
@Override
public void configureRabbitListeners(
RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
Run Code Online (Sandbox Code Playgroud)
并添加MessageHandlerFactory:
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
此外,您需要定义SimpleRabbitListenerContainerFactory(就像您已经做过的那样)并自动装配相应的ConnectionFactory:
@Autowired
public ConnectionFactory connectionFactory;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
完成配置后,您需要定义Bean,它处理您的消息并继承@ RabbitListerner-Annotations.对我来说,我命名为EventResultHandler(您将其命名为TestService):
@Bean
public EventResultHandler eventResultHandler() {
return new EventResultHandler();
}
Run Code Online (Sandbox Code Playgroud)
然后在EventResultHandler(或TestService)中定义@ RabbitListener-Methods及其相应的队列和Payload(= POJO,您的JSON消息序列化到的地方):
@Component
public class EventResultHandler {
@RabbitListener(queues=Queues.QUEUE_NAME_PRESENTATION_SERVICE)
public void handleMessage(@Payload Event event) {
System.out.println("Event received");
System.out.println("EventType: " + event.getType().getText());
}
}
Run Code Online (Sandbox Code Playgroud)
我省略了队列和交换所需的定义和绑定 - 你可以在一个或另一个微服务中 - 或者在RabbitMQ-Server中手动执行...但你肯定必须这样做.
创建一个jackson消息转换器并使用它进行设置 MessageListenerAdapter#setMessageConverter
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
Run Code Online (Sandbox Code Playgroud)
来自哪里MessageListenerAdapter
?
归档时间: |
|
查看次数: |
13608 次 |
最近记录: |