使用查找数据丰富KStream的理想方式

Vig*_*han 9 apache-kafka-streams

我的流有一个名为'category'的列,我在另一个商店中为每个'category'提供了额外的静态元数据,每隔几天就会更新一次.这种查找的正确方法是什么?Kafka流有两种选择

  1. 在Kafka Streams之外加载静态数据,仅用于KStreams#map()添加元数据.这是可能的,因为Kafka Streams只是一个图书馆.

  2. 将元数据加载到Kafka主题,将其加载到a KTable和do KStreams#leftJoin(),这似乎更自然,并将分区等留给Kafka Streams.但是,这要求我们保持KTable加载所有值.请注意,我们必须加载整个查找数据,而不仅仅是加载更改.

    • 例如,最初说只有一个类别'c1'.Kafka流应用程序优雅地停止,然后重新启动.重新启动后,添加了一个新类别"c2".我的假设是,table = KStreamBuilder().table('metadataTopic')只有值'c2',因为这是应用程序第二次启动以来唯一发生的变化.我希望它有'c1'和'c2'.
    • 如果它也有"c1",那么数据是否会从KTable中删除(可能是通过设置发送key = null消息?)?

上述哪一种是查找元数据的正确方法?

是否可以始终强制在重新启动时从头开始只读取一个流,这样就可以加载所有元数据KTable.

还有其他方式使用商店吗?

Mic*_*oll 13

  1. 在Kafka Streams之外加载静态数据,只需使用KStreams #map()添加元数据.这是可能的,因为Kafka Streams只是一个图书馆.

这有效.但通常人们会选择您列出的下一个选项,因为用于丰富输入流的辅助数据通常不是完全静态的; 相反,它正在发生变化但很少发生:

  1. 将元数据加载到Kafka主题,将其加载到KTable并执行KStreams#leftJoin(),这看起来更自然,并将分区等留给Kafka Streams.但是,这要求我们保持KTable加载所有值.请注意,我们必须加载整个查找数据,而不仅仅是加载更改.

这是通常的方法,我建议坚持,除非你有特殊的理由不这样做.

但是,这要求我们保持KTable加载所有值.请注意,我们必须加载整个查找数据,而不仅仅是加载更改.

所以我猜你也更喜欢第二种选择,但你担心这是否有效.

简短回答是:是的,KTable将加载每个键的所有(最新)值.该表将包含整个查找数据,但请记住,KTable在幕后进行分区:例如,如果您的输入主题(对于表)具有3分区,那么您可以运行3应用程序的实例,每个获取1表的分区(假设数据在分区之间均匀分布,那么表的每个分区/共享将占用表的数据的大约1/3).所以在实践中更有可能"只是工作".

是否有可能始终强制在重新启动时从头开始只读取一个流,这样就可以将所有元数据加载到KTable中.

你不必担心这一点.简而言之,如果表中没有可用的本地"副本",则Streams API将自动确保从头开始完全读取表的数据.如果有可用的本地副本,那么您的应用程序将重新使用该副本(并在表的输入主题中提供新数据时更新其本地副本).

用例子来回答更长的问题

想象一下您的输入数据(想想:更改日志流),请KTable注意此输入如何包含6消息:

(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22)
Run Code Online (Sandbox Code Playgroud)

KTable以下是此输入产生的"逻辑"的各种状态,其中每个新接收的输入消息(例如(alice, 1))将导致表的新状态:

Key      Value
--------------
alice   |   1    // (alice, 1) received

 |
 V

Key      Value
--------------
alice   |   1
bob     |  40    // (bob, 40) received

 |
 V

Key      Value
--------------
alice   |   2    // (alice, 2) received
bob     |  40

 |
 V

Key      Value
--------------
alice   |   2
bob     |  40
charlie | 600    // (charlie, 600) received

 |
 V

Key      Value
--------------
alice   |   5    // (alice, 5) received
bob     |  40
charlie | 600

 |
 V

Key      Value
--------------
alice   |   5
bob     |  22    // (bob, 22) received
charlie | 600
Run Code Online (Sandbox Code Playgroud)

你可以在这里看到的是,即使输入数据可能有很多很多消息(或者你所说的"改变";在这里,我们有6),结果中的条目/行数KTable(基于连续突变)在新接收的输入上)是输入中唯一键的数量(这里:从开始1,渐变到3),这通常明显小于消息的数量.因此,如果输入中的消息N数量是这些消息的唯一密钥的数量M,那么通常M << N(M明显小于N;加上,对于记录,我们有不变量M <= N).

这是"这需要我们保持KTable加载所有值"的第一个原因通常不是问题,因为每个键只保留最新值.

第二个原因是,正如Matthias J. Sax所指出的那样,Kafka Streams使用RocksDB作为此类表的默认存储引擎(更确切地说:支持表的状态存储).RocksDB允许您维护大于应用程序的可用主内存/ Java堆空间的表,因为它可以溢出到本地磁盘.

最后,第三个原因是a KTable被分区.因此,如果表的输入主题(例如)配置了3分区,那么幕后发生的事情就是KTable以相同的方式对自身进行分区(思考:分片).在上面的示例中,这里是您可能最终得到的结果,但确切的"拆分"取决于原始输入数据如何在表的输入主题的分区中传播:

逻辑KTable(我上面显示的最后状态):

Key      Value
--------------
alice   |   5
bob     |  22
charlie | 600
Run Code Online (Sandbox Code Playgroud)

实际的KTable,分区(假设3表的输入主题的分区,加上键=用户名在分区之间均匀分布):

Key      Value
--------------
alice   |   5    // Assuming that all data for `alice` is in partition 1

Key      Value
--------------
bob     |  22    // ...for `bob` is in partition 2

Key      Value
--------------
charlie | 600    // ...for `charlie` is in partition 3
Run Code Online (Sandbox Code Playgroud)

实际上,输入数据的这种分区 - 除其他外 - 允许您"调整"KTable的实际表现形式.

另一个例子:

  • 想象一下,您的KTable的最新状态通常具有1 TB的大小(同样,大致大小是表的输入数据中唯一消息密钥数量的函数,乘以相关消息值的平均大小).
  • 如果表的输入主题只有一个1分区,那么KTable本身也只有1分区,大小为1 TB.在这里,因为输入主题只有1分区,所以你可以用1应用程序实例运行你的应用程序(所以不是很多并行性,呵呵).
  • 如果表的输入主题具有500分区,则KTable也具有500分区,每个分区大小为~2 GB(假设数据均匀分布在分区上).在这里,您可以使用最多500app实例运行您的应用程序.如果您要运行完全500实例,那么每个应用程序实例将获得1逻辑KTable的完全分区/分片,从而最终得到2 GB的表数据; 如果你只运行100实例,那么每个实例都会得到500 / 100 = 5表的分区/分片,最后2 GB * 5 = 10 GB得到表数据.

  • 当输入流有多个分区,元数据流只有一个分区,并且应用程序有多个实例时会发生什么?应用程序的每个实例会加载元数据流,还是其中一个会加载它而其他人会以某种方式从该实例中获取值? (2认同)

Mat*_*Sax 6

您的整体观察是正确的,这取决于哪些权衡对您来说更重要.如果元数据很小,则选项1似乎更好.如果元数据很大,似乎选项2是要走的路.

如果您使用map(),则需要在每个应用程序实例中获得元数据的完整副本(因为您无法确切知道Streams将如何对KStream数据进行分区).因此,如果您的元数据不适合主内存使用map()将无法轻松工作.

如果您使用KTable,Streams将注意在所有正在运行的应用程序实例上正确分片元数据,这样就不需要重复数据.此外,KTable使用RocksDB作为状态存储引擎,因此可以溢出到磁盘.

编辑开始

关于拥有所有数据KTable:如果同一个键有两个类别,如果直接从主题读取数据到KTablevia builder.table(...)(更改日志语义),则第二个值将覆盖第一个值.但是,您可以通过将主题作为记录流来轻松解决此问题(即,builder.stream(...)应用聚合来计算KTable.您的聚合将简单地发出每个键的所有值的列表.

关于删除:KTable使用changelog语义并确实理解tombstone消息以删除键值对.因此,如果您KTable从主题中读取a 并且主题包含<key:null>消息,则KTable使用此密钥的当前记录将被删除.当这KTable是聚合的结果时,这很难实现,因为具有null键或null值的聚合输入记录将被简单地忽略并且不更新聚合结果.

解决方法是map()在聚合之前添加一个步骤并引入一个NULL值(即,用户定义的"对象"代表逻辑删除但不是null- 在您的情况下,您可以将其称为a null-category).在聚合中,null如果输入记录具有值,则只返回一个值作为aggegation结果null-category.然后,这将在您的逻辑删除消息中进行转换,KTable并删除此密钥的当前类别列表.

编辑结束

当然,您始终可以通过Processor API构建自定义解决方案.但是,如果DSL可以满足您的需求,那么就没有充分的理由这样做.