使用 Axon 4 接收来自 AMQP 的事件

mbo*_*dev 5 java amqp rabbitmq axon spring-boot

我正在尝试通过rabbitmq 将消息发送到基于 axon4 spring boot 的系统。已收到消息但未触发任何事件。我非常确定我错过了一个重要的部分,但到目前为止我还无法弄清楚。

这是我的 application.yml 的相关部分

axon:
    amqp:
        exchange: axon.fanout
        transaction-mode: publisher_ack
    # adding the following lines changed nothing
    eventhandling:
        processors:
            amqpEvents:
                source: in.queue
                mode: subscribing
spring:
    rabbitmq:
        username: rabbit
        password: rabbit
Run Code Online (Sandbox Code Playgroud)

从文档中我发现我应该创建一个 SpringAMQPMessageSource bean:

axon:
    amqp:
        exchange: axon.fanout
        transaction-mode: publisher_ack
    # adding the following lines changed nothing
    eventhandling:
        processors:
            amqpEvents:
                source: in.queue
                mode: subscribing
spring:
    rabbitmq:
        username: rabbit
        password: rabbit
Run Code Online (Sandbox Code Playgroud)

如果我从rabbitmq管理面板向队列发送消息,我会看到日志:

AxonConfig : received external message: (Body:'[B@13f7aeef(byte[167])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=in.queue, deliveryTag=2, consumerTag=amq.ctag-xi34jwHHA__xjENSteX5Dw, consumerQueue=in.queue]), channel: Cached Rabbit Channel: AMQChannel(amqp://rabbit@127.0.0.1:5672/,1), conn: Proxy@11703cc8 Shared Rabbit Connection: SimpleConnection@581cb879 [delegate=amqp://rabbit@127.0.0.1:5672/, localPort= 58614]

这里是应该接收事件的聚合:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class AxonConfig {

    @Bean
    SpringAMQPMessageSource inputMessageSource(final AMQPMessageConverter messageConverter) {
        return new SpringAMQPMessageSource(messageConverter) {
            @RabbitListener(queues = "in.queue")
            @Override
            public void onMessage(final Message message, final Channel channel) {
                log.debug("received external message: {}, channel: {}", message, channel);
                super.onMessage(message, channel);
            }
        };
    }

}
Run Code Online (Sandbox Code Playgroud)

所以问题是系统收到了消息但没有触发任何事件。消息的内容似乎无关紧要。无论我发送到队列什么,我都只会从 onMessage 方法中收到日志消息。

SpringAMQPMessageSource 的 JavaDoc 是这样说的:

/**
 * MessageListener implementation that deserializes incoming messages and forwards them to one or more event processors.
 * <p>
 * The SpringAMQPMessageSource must be registered with a Spring MessageListenerContainer and forwards each message
 * to all subscribed processors.
 * <p>
 * Note that the Processors must be subscribed before the MessageListenerContainer is started. Otherwise, messages will
 * be consumed from the AMQP Queue without any processor processing them.
 *
 * @author Allard Buijze
 * @since 3.0
 */
Run Code Online (Sandbox Code Playgroud)

但到目前为止我还不知道在哪里以及如何注册它。

我的配置中的 axon.eventhandling 条目和我的聚合中的 @ProcessingGroup("amqpEvents") 已经来自测试。但有或没有这些条目根本没有什么区别。也尝试过不使用 mode=subscribing。

确切版本:Spring Boot 2.1.4、Axon 4.1.1、axon-amqp-spring-boot-autoconfigure 4.1

非常感谢任何帮助或提示。


2019 年 4 月 23 日更新:

我尝试像这样编写自己的类:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.messaging.SubscribableMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

@Slf4j
@Component
public class RabbitMQSpringAMQPMessageSource implements ChannelAwareMessageListener, SubscribableMessageSource<EventMessage<?>> {

    private final List<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArrayList<>();
    private final AMQPMessageConverter messageConverter;

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    @Override
    public Registration subscribe(final Consumer<List<? extends EventMessage<?>>> messageProcessor) {
        eventProcessors.add(messageProcessor);
        log.debug("subscribe to: {}", messageProcessor);
        return () -> eventProcessors.remove(messageProcessor);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {
        log.debug("received external message: {}, channel: {}", message, channel);
        log.debug("eventProcessors: {}", eventProcessors);
        if (!eventProcessors.isEmpty()) {
            messageConverter.readAMQPMessage(message.getBody(), message.getMessageProperties().getHeaders())
                            .ifPresent(event -> eventProcessors.forEach(
                                ep -> ep.accept(Collections.singletonList(event))
                            ));
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

结果是相同的,日志现在证明事件处理器只是空的。

eventProcessors: []
Run Code Online (Sandbox Code Playgroud)

所以问题是,如何正确注册事件处理器。有没有办法用 Spring 正确地做到这一点?

更新2:

也没有运气:

@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        super(messageConverter);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {

        try {
            final var eventProcessorsField = this.getClass().getSuperclass().getDeclaredField("eventProcessors");
            eventProcessorsField.setAccessible(true);
            final var eventProcessors = (List<Consumer<List<? extends EventMessage<?>>>>) eventProcessorsField.get(this);
            log.debug("eventProcessors: {}", eventProcessors);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }

        log.debug("received message: message={}, channel={}", message, channel);
        super.onMessage(message, channel);
    }

}
Run Code Online (Sandbox Code Playgroud)
axon:
    eventhandling:
        processors:
            amqpEvents:
                source: rabbitMQSpringAMQPMessageSource
                mode: SUBSCRIBING
Run Code Online (Sandbox Code Playgroud)

除了上述之外,以编程方式注册它也没有帮助:

    @Autowired
    void configure(EventProcessingModule epm,
                   RabbitMQSpringAMQPMessageSource rabbitMessageSource) {
        epm.registerSubscribingEventProcessor("rabbitMQSpringAMQPMessageSource", c -> rabbitMessageSource);
        epm.assignProcessingGroup("amqpEvents", "rabbitMQSpringAMQPMessageSource");// this line also made no difference
    }
Run Code Online (Sandbox Code Playgroud)

当然, @ProcessingGroup("amqpEvents") 位于我的类中,其中包含 @EventSourcingHandler 带注释的方法。


2019 年 4 月 25 日更新:

请参阅阿拉德接受的答案。非常感谢指出我所犯的错误:我错过了 EventSourcingHandler 不接收来自外部的消息。这是为了预测。不用于分发聚合!ups 这里是现在从rabbitmq接收事件的配置/类:

axon:
    eventhandling:
        processors:
            amqpEvents:
                source: rabbitMQSpringAMQPMessageSource
                mode: SUBSCRIBING
Run Code Online (Sandbox Code Playgroud)
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        super(messageConverter);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {
        log.debug("received message: message={}, channel={}", message, channel);
        super.onMessage(message, channel);
    }

}
Run Code Online (Sandbox Code Playgroud)
import lombok.extern.slf4j.Slf4j;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Service;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@ProcessingGroup("amqpEvents")
@Service
public class OrderedProductsEventHandler {

    private final Map<String, OrderedProduct> orderedProducts = new HashMap<>();

    @EventHandler
    public void on(OrderPlacedEvent event) {
        log.debug("event: {}", event);
        String orderId = event.getOrderId();
        orderedProducts.put(orderId, new OrderedProduct(orderId, event.getProduct()));
    }

    @EventHandler
    public void on(OrderConfirmedEvent event) {
        log.debug("event: {}", event);
        orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
            orderedProduct.setOrderConfirmed();
            return orderedProduct;
        });
    }

    @EventHandler
    public void on(OrderShippedEvent event) {
        log.debug("event: {}", event);
        orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
            orderedProduct.setOrderShipped();
            return orderedProduct;
        });
    }

    @QueryHandler
    public List<OrderedProduct> handle(FindAllOrderedProductsQuery query) {
        log.debug("query: {}", query);
        return new ArrayList<>(orderedProducts.values());
    }

}
Run Code Online (Sandbox Code Playgroud)

当然,我从我的聚合中删除了@ProcessingGroup。

我的日志:

RabbitMQSpringAMQPMessageSource : received message: ... 
OrderedProductsEventHandler : event: OrderShippedEvent...
Run Code Online (Sandbox Code Playgroud)

All*_*ard 5

在 Axon 中,聚合不接收来自“外部”的事件。聚合内的事件处理程序(更具体地说,它们是 EventSourcingHandler)仅处理由同一聚合实例发布的事件,以便它可以重建其先前的状态。

只有外部事件处理程序(例如更新投影的事件处理程序)才会从外部源接收事件。

为此,您的 application.yml 应该提及 bean 名称作为处理器的源,而不是队列名称。所以在你的第一个例子中:

    eventhandling:
        processors:
            amqpEvents:
                source: in.queue
                mode: subscribing
Run Code Online (Sandbox Code Playgroud)

应该变成:

    eventhandling:
        processors:
            amqpEvents:
                source: inputMessageSource
                mode: subscribing
Run Code Online (Sandbox Code Playgroud)

但同样,这仅适用于在组件上定义的事件处理程序,而不适用于聚合上定义的事件处理程序。