如何读取 Axon Event Store 中存储的事件?

int*_*dev 1 cqrs event-sourcing axon axon-framework

我正在努力了解如何连接到默认的 Axon 事件存储并查看其中存储了哪些事件?有没有办法以某种方式预览它?

如何重播存储的事件以重新创建存储实体的特定状态?我试图在互联网上查找示例、教程或视频课程,但找不到任何内容...如何制作快照然后检索它?无法找到任何关于如何执行此操作的在线示例...如果之前有人问过这个问题,有人可以建议或分享 StackOverflow 上讨论的链接吗?

谢谢

Ste*_*ven 8

我觉得这里有三个问题。首先是标题,其次是否有任何教程,第三是如何重新创建实体。为了清楚起见,最好从中创建单独的问题,但无论如何我都会回答它们。

1.读取Axon事件存储中存储的事件

如果您使用 Axon Server,读取事件就像浏览 Axon Server 的仪表板一样简单。从那里您可以单击“搜索”按钮,该按钮将打开搜索面板,允许您检查已存储的所有事件。那里有一个充实的查询语言,在这里详细解释 -> https://docs.axoniq.io/reference-guide/appendices/query-reference。

如果 Axon Server UI 不满足您的需求,最简单的方法是使用curl. 要了解您可以执行哪些操作,您可以查看 Axon Server 的 swagger 页面(位于 http:[server]:[port]/swagger-ui.html)。

如果您不使用 Axon Server,那么您的事件将存储在 RDBMS 或 MongoDb 中。如果您想知道如何读取其中任何一个,我建议您阅读所使用的 RDBMS 实例或 MongoDb 的文档,了解如何查询表。

2.Axon教程材料

我假设简单地谷歌搜索“轴突编码示例”*应该*为您提供大量信息,但让我在这个答案中为您提供一些信息:

3. 在 Axon 中重新创建模型

现在,当谈到读取事件时,几乎在所有场景中,您“永远”都不需要使用 Axon Framework 手动执行此操作。您只需使用“@EventHandler”注释将一个方法标记为能够处理事件,框架就会自动找到处理程序并将其注册到“EventBus”/“EventStore”。

对于实体的重新创建,同样适用,但增加了一个层。随着 Axon 推广 CQRS 的使用,您将拥有不同的模型。命令模型是 Axon 通过聚合细节所支持的,可在此处找到。每次从聚合的Repository. 情况就是这样,因为 Axon 实现了一个简单地遵循每次EventSourcingRepository重新创建实体的范例。

如果您想要重新创建的实体是查询模型的一部分,您将必须告诉负责向您的@EventHandlers 提供事件的组件从头开始(因此您再次不需要自己读取事件)。Axon 中落后的技术方面EventStore称为EventProcessor. 如果您想要一个EventProcessor可以重播这些事件来重新创建实体的工具,则必须使用TrackingEventProcessor(TEP)。TEP 公开了一个resetTokens()操作,该操作本质上将事件处理器在事件流中的点更改为过去的点;从而允许您重新创建您的实体。有关这方面的文档可以在这里找到。

4. 在 TrackingEventProcessor 上调用重置

如果您想要重新创建查询模型,无论是因为其格式已更改,还是仅仅因为您有新模型,您将需要重置TrackingToken(s)( TrackingEventProcessorTEP)。请注意,这要求您使用 TEP 作为事件处理组件(包含带注释方法的类@EventHandler)的处理器,更新要重新创建的模型。只有 TEP 提供重置选项,因为它是用于TrackingTokens跟踪事件流进度的唯一事件处理器。通过调整其在该流上的位置,您可以有效地调用重置。

如果要重置 TEP,您首先需要知道处理器的名称。默认情况下,这将是事件处理组件 (EHC) 的包名称。但是,建议使用@ProcessingGroupEHC 上的注释为您的处理组(即事件处理器)提供不同的名称。它有助于对组进行推理,而且还提供了更轻松的调用重置的方法。

有了名称,我们就可以在 Axon 的配置中找到 TEP。为此,您最好使用EventProcessingConfiguration,因为它公开了一个为您eventProcessor(String, Class<T>)返回 an 的方法Optional<EventProcessor>。有了这个选项,您就可以开始重置操作了。在重置 TEP 之前,必须确保没有单个 TEP 实例正在处理事件。这意味着您必须先使用该TrackingEventProcessor#shutDown方法关闭 TEP。此后,可以调用重置,从而有效地将TrackingToken(s)TEP 调整到新位置。此后,您的 TEP 可以再次启动。

在代码中,如下所示:

public void resetTrackingProcessor(EventProcessingConfiguration config, String processorName) {
    config.eventProcessor(processorName, TrackingEventProcessor.class)
          .ifPresent(tep -> {
              tep.shutDown();
              tep.resetTokens();
              tep.start();
          });
}
Run Code Online (Sandbox Code Playgroud)

请注意,上述操作在单节点系统中完全可以正常工作,因为然后shutdown关闭唯一存在的 TEP 时的操作。但是,如果您已将应用程序分发到多个实例中,那么将需要找到一种方法来关闭所有这些实例,然后再调用其中一个实例的重置。简而言之,您需要一个在给定应用程序范围之外的中央组件,它了解所有现有的 TEP。自己创建这个是完全可行的(我过去已经这样做过),但现在我建议您使用 Axon Server 来避免麻烦。

Axon Server 充当中间人,不仅用于事件存储和消息分发,还作为监控系统的地方。因此,您的事件处理器也是如此,此外还公开关闭、启动、释放令牌、拆分和合并操作。