标签: axon

Axon @QueryHandler与Spring @ExceptionHandler

使用注释为Spring @ResponseStatus的Axon @QueryHandler中引发的异常时遇到麻烦。原始异常被QueryHandler吞没,并且特定于Axon的AxonServerRemoteQueryHandlingException被抛出,当spring响应客户端时,实际上给出了500

仍然有可能从Axon异常中获取一些信息,例如原始的“找不到实体”消息,但没有异常类型,也没有原始异常保存的其他任何信息。

问题1:有什么方法可以将在查询处理程序中引发的异常提升为404的Spring响应

Spring异常处理程序

@ResponseStatus(value = HttpStatus.NOT_FOUND)
public class NotFoundException extends ServiceException() {
  ...
}
Run Code Online (Sandbox Code Playgroud)

轴突查询处理程序

@QueryHandler
public Application getApplicationById(ApplicationByIdQuery query) {
  return applicationRepository.findById(query.getId())
      .orElseThrow(() -> new NotFoundException(Application.class, query.getId()));
}
Run Code Online (Sandbox Code Playgroud)

弹簧控制器

@Autowired
QueryGateway queryGateway;

@GetMapping(path = "/{applicationId}")
public CompletableFuture<Application> getApplication(@PathVariable String applicationId) {
  return queryGateway.query(new ApplicationByIdQuery(applicationId), ResponseTypes.instanceOf(Application.class));
}
Run Code Online (Sandbox Code Playgroud)

实际结果json:

{
  "timestamp": "2019-02-08T08:04:03.629+0000",
  "status": 500,
  "error": "Internal Server Error",
  "message": "An exception was thrown by the remote message handling component.",
  "path": "/api/applications/dff59c46-baf1-40f5-8a21-9286d1f8e36fx"
}
Run Code Online (Sandbox Code Playgroud)

问题2:我的另一个问题是,为什么不直接使用常规的JPA Query API,而是使用Axon的QueryHandler。投影表是常规的JPA表,可以通过功能非常强大的Spring JPA进行质疑。是因为直接查询不能确保投影数据的一致性?我经历了许多示例,其中大多数使用直接访问(请参见下面的内容),其余示例无法解决底层QueryHandler引发的异常

@Autowired
ApplicationRepository …
Run Code Online (Sandbox Code Playgroud)

spring axon

4
推荐指数
1
解决办法
388
查看次数

DOMAIN_EVENT_ENTRY 表不是由 AXON 创建的

我的Aggregate礼品卡定义如下,

@Data
@NoArgsConstructor
@Aggregate
public class GiftCard {

    @AggregateIdentifier
    private String id;

    private int remainingValue;

    @CommandHandler
    public GiftCard(IssueCardCommand cmd) {
        apply(new CardIssuedEvent(cmd.getCardId(), cmd.getAmount()));
    }

    @CommandHandler
    public GiftCard(TempCommand cmd) {
        apply(new CardIssuedEvent(cmd.getCardId(), cmd.getAmount()));
    }

    @EventSourcingHandler
    public void on(CardIssuedEvent event) {
        this.id = event.getCardId();
        this.remainingValue = event.getAmount();
    }
}

Run Code Online (Sandbox Code Playgroud)

IssueCardCommand从控制器调度。

public String createGreeting(@PathVariable String cardNumber) {
    IssueCardCommand issueCardCommand = new IssueCardCommand(cardNumber, 100);
    commandGateway.sendAndWait(issueCardCommand, 500L, TimeUnit.MILLISECONDS);
    return "Hey";
}
Run Code Online (Sandbox Code Playgroud)

我可以通过http://localhost:8024/#query在 AxonServer 中查看来确认已调度事件。

我想做 EventSourcing 并设置了内存中的 H2 数据库。 …

spring-data-jpa axon

4
推荐指数
1
解决办法
842
查看次数

原始 Axon 应用程序作为 Fat JAR 运行不会自动配置 Axon Beans

问题:

研究:在https://gitlab.com/ZonZonZon/simple-axon.git上,我编写了一个简单的Axon 应用程序,以表明使用Gradle 插件构建的JAR 工件在(运行时)不会自动配置 Axon beans罐)。虽然它在 Intellij 下运行良好。 com.github.johnrengelman.shadow

从终端中的项目根目录:

run gradle clean build shadowJar;
java -jar build/simpleaxon.jar;
Run Code Online (Sandbox Code Playgroud)

此处附有堆栈跟踪。我希望 Axon Autoconfiguration 默认提供 CommandBus、Snapshotter 等 Bean。

问题:如何在 fat jar 中自动配置默认的 axon beans?

gradle axon spring-boot fatjar

4
推荐指数
1
解决办法
203
查看次数

Axon 或 Kafka 支持 CQRS/ES

考虑一个简单的用例,我想将产品评级作为事件存储在事件存储中。

我可以使用两种不同的方法:

  1. 使用Axon:Rating聚合负责处理CreateRatingCommand并发送RatingCreatedEvent。发送事件会将评级存储在事件存储中。其他事件处理程序可以在连接到 Axon 服务器实例并执行评级所需的任何操作时重播事件流。在这种情况下,事件处理程序将用作流处理器。
  2. 使用Kafka: KafkaProducer 将用于在 Kafka 主题中存储评级 POJO(正确序列化后)。将主题的保留时间设置为无限期将导致任何事件都不会及时丢失。在这种情况下,Kafka Streams 将用于执行实际的评级处理逻辑。

对于这两种方法,我都出现了一些架构问题:

使用轴突时:

  1. 如果聚合内没有需要维护或更改的真实状态,那么使用 Axon(或类似解决方案)是否有任何附加值?聚合仅充当数据的“哑”占位符,但不提供任何状态更改逻辑。
  2. Axon 如何处理同一事件类型的多个事件处理程序?它们是否都会并行处理相同的事件(相同的聚合 ID),或者相同的事件仅由其中一个处理程序处理一次?
  3. 存储在 Axon 事件存储中的事件是否会保留到时间结束?

使用卡夫卡时:

  1. Kafka 将具有相同键的事件/消息存储在同一分区中。在用户产品评级用例中,如何为密钥选择最佳值?UserId、ProductId 或两者的单独主题,并在两个主题中发布每个事件。
  2. 为每个用户和每个产品使用单独的主题是否明智,从而导致集群上有大量主题?(大约<5k 个产品和>10k 个用户)。

我不知道 SO 是否是此类问题的首选论坛...我只是想知道您在这个特定用例中推荐什么作为最佳实践。期待您的反馈,并随时指出我在之前的问题中遗漏的其他想法。

编辑@12/11/2020:我刚刚找到了一个相关的讨论,其中包含与我的问题相关的有用信息。

architecture cqrs event-sourcing apache-kafka axon

4
推荐指数
1
解决办法
3508
查看次数

在CQRS中同步查询端数据 - 是否仍会存在争用?

我对CQRS范式有一般性的疑问.

据我所知,CommandBus和EventBus将域模型与我们的查询端数据存储区分离,最终一致性的优点,以及能够在查询端对存储进行非规范化以优化读取等等.这听起来都很棒.

但我想知道,当我开始扩展查询端负责更新Query数据存储区的组件数量时,如果他们不会开始相互竞争以执行更新?

换句话说,如果我们尝试将一个发布/订阅模型用于EventBus,并且特定事件类型有很多不同的订阅者,那么他们是否会因更新各种非规范化数据而开始相互竞争?难道这不会像我们在CQRS之前那样把我们放在同一条船上吗?

正如我听到它解释的那样,听起来CQRS应该一起消除这种争论,但这只是一种理想,实际上我们只是真正地将它最小化了吗?我觉得我可能会在这里丢失一些东西,但不能把手指放在上面.

domain-driven-design cqrs event-sourcing axon

3
推荐指数
1
解决办法
1058
查看次数

有没有办法等待Saga的结果或例外?

假设我有一个Saga在几毫秒内进行汇款.我有REST控制器,它调用触发Saga的命令.我如何等待Saga的结束检查结果或异常让我的控制器作为响应返回?如果它只是一个不会触发Saga的单独命令,我可以使用命令网关和回调来通知我成功或失败.

更新:

在Saga结束之后,我能够让我的控制器返回响应:

1)我的控制器方法返回一个DeferredResult,我将其保存到地图中

2)我的控制器有一个事件处理程序,它监听结束事件,从地图中检索DeferredResult,并设置结果

有没有更好的方法来解决这个问题?

axon

3
推荐指数
1
解决办法
643
查看次数

Axon 框架 - 我如何回滚 Saga 进程

我正在使用 Axon 框架,而没有带有 spring-boot 自动配置的 Axon 服务器。我的问题是:如果我正在运行一个 saga 进程,如下所示,并且在处理过程中我的磁盘用完了,将会出现异常,我必须回滚。我怎样才能做到这一点?我必须回滚多少?

我的 saga_entry 表将是这样的:

SAGA_ID:22ad255b-4378-4bb4-84c2-061ca666c6e7修订:空,SAGA_TYPE:hu.saga.account.SagaAccount,SERIALIZED_SAGA:3c68752e6d6f6c617269732e736167612e6163636f756e742e536167614163636f756e742f3e

我的 association_value_entry 表将是这样的:

ID: 2, ASSOCIATION_KEY: accountId, ASSOCIATION_VALUE: 11, SAGA_ID: 22ad255b-4378-4bb4-84c2-061ca666c6e7, SAGA_TYPE: hu.saga.account.SagaAccount

在我的 domain_event_entry 表中将只有两个事件(而不是应该是 3 个) AccountCreatedEvent, MoneyWithdrawnEvent ,聚合 id: 11 。

这只是一个例子,主要问题可能是在 Axon Saga 中处理和实现回滚的最佳方法是什么?

@StartSaga
@SagaEventHandler(associationProperty = "accountId")
public void handle(AccountCreatedEvent event){
    logger.info("Saga Start");
    commandGateway.send(new WithdrawMoneyCommand(event.getAccountId(), event.getAccountId(),event.getOverDraftLimit()));
}
Run Code Online (Sandbox Code Playgroud)

我写 null 来模拟在 DepositMoneyCommand 构造函数中的潜在回滚

@SagaEventHandler(associationProperty = "accountId")
public void handle(MoneyWithdrawnEvent event){
    commandGateway.send(new DepositMoneyCommand(null,event.getTransactionId(),event.getAmount()));
}


@EndSaga
@SagaEventHandler(associationProperty = "accountId")
public void handle(MoneyDepositedEvent event){
    logger.info("Saga End"); …
Run Code Online (Sandbox Code Playgroud)

java saga axon spring-boot

3
推荐指数
1
解决办法
1819
查看次数

有没有一种方法可以直接从测试装置测试事件而不使用expectEvents?

我正在尝试测试聚合,并想断言固定装置之外的事件,甚至可能使用 Hamcrest 进行评估?

使用时间戳的示例

        fixture.given()
           .when(new UserCreateCommand("1","test@bob.com"))
           .expectEvents(new UserCreatedEvent("1","test@bob.com");
Run Code Online (Sandbox Code Playgroud)

该夹具允许我轻松地测试相等性,例如该命令准确地生成此事件,如果我想说引入事件创建时间的时间戳,那么它就不那么容易了

        fixture.given()
           .when(new UserCreateCommand("1","test@bob.com"))
           .expectEvents(new UserCreatedEvent("1","test@bob.com", LocalDateTime.now());
Run Code Online (Sandbox Code Playgroud)

这种期望永远不会起作用,因为 LocalDateTime.now() 永远不会精确等于聚合中生成的时间戳。

我可以简单地将时间戳包含在命令有效负载中,但更喜欢在聚合内部进行处理,以确保以一致的方式生成此时间戳。

有没有办法从夹具中检索事件以独立于夹具进行断言,例如

   UserCreatedEvent uce = fixture.given()
           .when(new UserCreateCommand("1","test@bob.com"))
           .extractEvent(UserCreatedEvent.class)
Run Code Online (Sandbox Code Playgroud)

这将允许我使用其他断言库,例如 hamcrest:

例如

   assertThat(uce.getCreatedAt(), is(greaterThanOrEqualto(LocalDateTime.now().minusSeconds(1);

Run Code Online (Sandbox Code Playgroud)

axon

3
推荐指数
1
解决办法
885
查看次数

在命令和事件中使用值对象?

我们可以在命令中使用值对象吗?

假设我有一个商店(聚合),其中有一个值对象地址。在值对象构造函数 Address 中,我放置了一些地址验证逻辑。因此,如果我在 command (CreateShopCmd) 中使用该 Address 对象,那么它会在制作 command 时得到验证,但我想要或读取的验证应该存在于命令处理程序中。

但问题是,我必须再次将验证放入命令处理程序中(因为验证已存在于地址构造函数中),如果我不将其放入命令处理程序中,那么当我在中创建地址对象时,将进行验证事件处理程序并分配给商店聚合(这是不正确的)

所以,请指导我。

下面是代码示例

   @Aggregate
   @AggregateRoot
   public class Shop {

   @AggregateIdentifier
   private ShopId shopId;
   private String shopName;
   private Address address;

   @CommandHandler
   public Shop(CreateShopCmd cmd){

     //Validation Logic here , if not using the Address in 
     // in cmd

         //Fire an event after validation
         ShopRegistredEvt shopRegistredEvt = new ShopRegistredEvt();
         AggregateLifecycle.apply(shopRegistredEvt);
     }

     @EventSourcingHandler
     public void on(ShopRegistredEvt evt) {

     this.shopName = evt.getShopName();

     //Validation happend here if not put in cmd at the time of …
Run Code Online (Sandbox Code Playgroud)

domain-driven-design cqrs event-sourcing axon

2
推荐指数
1
解决办法
2374
查看次数

Axon 框架中两个聚合体之间的通信

我是 Axon 框架、CQRS 和 DDD 的新手。我被教导使用关系数据库创建简单的 CRUD 应用程序。因此,我首先专注于构建数据模型,而不是领域模型。我想改变我的软件方法,使其更加务实,并创建现实世界的应用程序。因此,我想使用 CQRS 模式和事件溯源。

我现在正在使用 Spring Boot 和 Axon 框架开发一个库应用程序。基本要求之一是用户已借书。我有两个用于 User 和 Book 的聚合。

这是我的 BookAggregate:

    @Data
    @AllArgsConstructor @NoArgsConstructor
    @Aggregate
    public class BookAggregate {
    
        @AggregateIdentifier
        private UUID id;
        private String name;
        private String isbnNumber;
        private int amountOfCopies;
        private Author author;
        private Genre genre;
        private PublishingHouse publishingHouse;
        
        ...
Run Code Online (Sandbox Code Playgroud)

这是我的 UserAggregate:

    @Data
    @AllArgsConstructor @NoArgsConstructor
    @Aggregate
    public class UserAggregate {
    
        @AggregateIdentifier
        private UUID id;
        private String firstName;
        private String lastName;
        private String email;
        private String password;
        private …
Run Code Online (Sandbox Code Playgroud)

domain-driven-design cqrs axon spring-boot

2
推荐指数
1
解决办法
259
查看次数