Kafka Streams - 使用容错定义自定义关系/非键值状态存储

Hos*_*ein 1 apache-kafka spring-cloud-stream apache-kafka-streams

我正在尝试使用 kafka 实现事件溯源。

我对流处理器应用程序的设想是一个典型的 3 层 Spring 应用程序,其中:

  • “表示”层被(由?)Kafka 流 API 取代。
  • 业务逻辑层由拓扑中的处理器 API 使用。
  • 此外,DB 是一个关系 H2 内存数据库,可通过 Spring Data JPA 存储库访问。存储库还实现了必要的接口,以便将它们注册为 Kafka 状态存储以使用好处(恢复和容错)

但我想知道我应该如何实现自定义状态存储部分?

我一直在寻找和:

  • 有一些接口,例如StateStore& StoreBuilderStoreBuilderwithLoggingEnabled()方法;但是如果我启用它,实际的更新和更改日志记录何时发生?通常示例都是键值存储,即使是自定义的。如果我不想要键值怎么办?kafka 文档中交互式查询部分的示例并没有削减它。

  • 我知道交互式查询。但它们似乎适合查询而不是更新;顾名思义。

在键值存储中,发送到更改日志的记录很简单。但是如果我不使用键值;我何时以及如何通知 kafka 我的状态已更改?

Mat*_*Sax 5

您需要为StateStore要使用的实际商店引擎实现。此界面不指定有关商店的任何内容,您可以随心所欲。

您还需要实现一个StoreBuilder作为工厂来创建自定义商店的实例。

MyCustomStore implements StateStore {
    // define any interface you want to present to the user of the store
}

MyCustomStoreBuilder implements StoreBuilder<MyCustomStore> {
    MyCustomStore builder() {
        // create new instance of MyCustomStore and return it
    }

    // all other methods (except `name()`) are optional
    // eg, you can do a dummy implementation that only returns `this`
}
Run Code Online (Sandbox Code Playgroud)

比较:https : //docs.confluent.io/current/streams/developer-guide/processor-api.html#implementing-custom-state-stores

但是如果我不使用键值;我何时以及如何通知 kafka 我的状态已更改?

如果要实现withLoggingEnabled()(类似于缓存),则需要将此日志记录(或缓存)作为存储的一部分来实现。因为,Kafka Streams 不知道您的商店如何工作,因此无法为此提供实现。因此,这是您的设计决定,无论您的商店是否支持登录更改日志主题。如果您想支持日志记录,您需要提出一种设计,将存储更新映射到键值对(您也可以为每个更新写入多个),您可以将其写入更改日志主题并允许您重新创建状态从更改日志主题中读取这些记录时。

不仅可以通过更改日志记录来获得容错存储。例如,您还可以插入一个远程存储,在内部执行复制等操作,从而依赖存储的容错功能而不是使用更改日志记录。当然,与使用本地存储相比,使用远程存储意味着其他挑战。

对于 Kafka Streams 默认存储,日志记录和缓存作为实际存储的包装器实现,使其易于插入。但是您可以通过最适合您商店的任何方式来实现这一点。您可能需要查看以下用于键值存储的类作为比较:

对于交互式查询,您可以实现相应QueryableStoreType的集成自定义商店。参见 https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores你说得对,交互式查询是现有商店的只读界面,因为Processors应负责维护商店。但是,也没有什么可以阻止您打开自定义存储进行写入。但是,这将使您的应用程序固有地不确定,因为如果您回滚输入主题并重新处理它,它可能会计算出不同的结果,具体取决于执行的“外部存储写入”。您应该考虑通过输入主题对商店进行任何写入。但这是你的决定。如果您允许“外部写入”,您将需要确保它们也被记录,以防您想要实现日志记录。