Kafka流用例以添加全局存储

zoo*_*oom 3 java apache-kafka apache-kafka-streams

在kafka流中定义拓扑时,可以添加全局状态存储。它将需要一个源主题以及一个ProcessorSupplier。处理器接收记录,并可以在理论上对其进行转换,然后再将其添加到存储中。但是,在还原的情况下,记录会直接从源主题(更改日志)插入到全局状态存储中,而跳过最终在处理器中完成的转换。

   +-------------+             +-------------+              +---------------+
   |             |             |             |              |    global     |
   |source topic  ------------->  processor  +-------------->    state      |
   |(changelog)  |             |             |              |    store      |
   +-------------+             +-------------+              +---------------+
          |                                                         ^
          |                                                         |
          +---------------------------------------------------------+
              record directly inserted during restoration

Run Code Online (Sandbox Code Playgroud)

StreamsBuilder#addGlobalStore(StoreBuilder storeBuilder,字符串主题,已消耗的消耗,ProcessorSupplier stateUpdateSupplier)将全局StateStore添加到拓扑。

根据文档

注意:您不应使用处理器将转换后的记录插入全局状态存储。该存储使用源主题作为changelog,并且在还原期间将直接从源中插入记录。应该使用此ProcessorNode来保持StateStore的最新状态。

同时,由于当前在kafka错误跟踪器上打开了主要错误:从主题还原状态时,不会使用addGlobalStore上提供的KAFKA-7663自定义处理器,该状态可以准确解释文档中所述的内容,但似乎是可以接受的错误。

我想知道KAFKA-7663是否确实是个错误。根据文档,它似乎是这样设计的,在这种情况下,我很难理解用例。
有人可以解释这个底层API的主要用例吗?我唯一能想到的就是处理副作用,例如在处理器中执行一些日志操作。

额外的问题:如果源主题充当全局存储的变更日志,则由于保留已过期而从该主题删除记录时,是否会将其从全局状态存储中删除?还是仅在从更改日志中对整个商店进行还原之后,才会在商店中进行删除。

小智 5

是的,这是一个很奇怪的小问题22,但是文档是正确的。全局状态存储的处理器不得对记录做任何事情,而应将其持久存储到存储中。

AFAIK,这不是一个哲学问题,只是一个实际问题。原因很简单,就是您观察到的行为... Streams将输入主题视为存储的changelog主题,因此在还原过程中会绕过处理器(以及反序列化)。

状态恢复绕过任何处理的原因是通常变更日志中的数据与存储中的数据相同,因此对它进行任何新操作实际上都是错误的。另外,仅将字节从线中删除并将它们批量写入状态存储中就更有效。我之所以说“通常”,是因为在这种情况下,输入主题与普通的changelog主题并不完全一样,因为在存储放置期间它不会接收其写入。

对于它的价值,我也很难理解用例。看来,我们应该:

  1. 完全摆脱该处理器,就像恢复一样,总是将二进制数据从网络中转储到存储中。
  2. 重新设计全局存储,以允许在全局存储之前进行任意转换。我们可以:
    • 继续使用输入主题并在还原期间反序列化并调用处理器,或者
    • 为全球商店添加真实的变更日志,以便我们轮询输入主题,进行一些转换,然后写入全局商店 global-store-changelog。然后,我们可以使用变更日志(而非输入)进行还原和复制。

顺便说一句,如果您想要后一种行为,则可以立即应用转换,然后使用to(my-global-changelog)来制造“ 变更日志”主题,以近似其行为。然后,您将创建全局存储区以从您my-global-changelog的输入而不是输入中读取。

因此,给您一个直接的答案,KAFKA-7663不是错误。我将对建议将票证转换为功能请求的票证进行评论。

最佳答案:不能为保留状态配置充当状态存储的更改日志的主题。实际上,这意味着您应该通过启用压缩来防止无限增长,并禁用日志保留。

实际上,旧数据因保留而丢失和丢失不是“事件”,消费者无法知道是否/何时发生。因此,无法响应此非事件而从状态存储中删除数据。就像您描述的那样,它将发生……记录将无限期地存在于全局存储中。如果/当替换实例时,新实例将从输入中恢复,并且(显然)仅接收当时该主题中存在的记录。因此,整个Streams集群将在全局状态不一致的情况下结束。这就是为什么您应该禁用保留。

从存储中“删除”旧数据的正确方法是将所需键的逻辑删除写入输入主题。然后,它将被正确传播到群集的所有成员,在还原过程中正确应用,并由代理正确压缩。

希望对您有帮助。肯定的,请输入票证并帮助我们使API更加直观!

  • 澄清“充当状态存储变更日志的主题不得配置保留。”:这意味着您不应将主题配置为在经过特定时间或超过特定大小阈值后使数据过期。相反,数据应该“永远”保留在主题中,并且启用压缩有助于确保主题仍然不会超出范围。 (4认同)