fox*_*gen 1 apache-kafka apache-kafka-streams
我有一个拓扑(见下文),它读取了一个非常大的主题(每天超过十亿条消息).这个Kafka Streams应用程序的内存使用率非常高,我正在寻找一些关于如何减少州商店足迹的建议(详情如下).注意:我并不是想把山羊放到国营商店,我只是觉得可能有办法改善我的拓扑结构 - 见下文.
// stream receives 1 billion+ messages per day
stream
.flatMap((key, msg) -> rekeyMessages(msg))
.groupBy((key, value) -> key)
.reduce(new MyReducer(), MY_REDUCED_STORE)
.toStream()
.to(OUTPUT_TOPIC);
// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);
// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)
// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)
// etc
Run Code Online (Sandbox Code Playgroud)
更具体地说,我想知道OUTPUT_TOPIC作为KTable 流式传输是否导致状态存储(REKEYED_STORE)大于它需要在本地存储.对于具有大量唯一键的changelog主题,将它们作为a KStream和窗口化聚合进行流式传输会更好吗?或者这不会像我想的那样减少占用空间(例如,只有一部分记录 - 窗口中的那些记录将存在于本地状态存储中).
无论如何,我总是可以启动这个应用程序的更多实例,但我想让每个实例尽可能高效.这是我的问题:
任何帮助将不胜感激!
用你当前的模式
stream.....reduce(.)toStream().to(OUTPUT_TOPIC);
builder.table(OUTPUT_TOPIC, REKEYED_STORE)
Run Code Online (Sandbox Code Playgroud)
你得到两个内容相同的商店.一个用于reduce()操作员,一个用于阅读table()- 这可以减少到一个商店,但:
KTable rekeyedTable = stream.....reduce(.);
rekeyedTable.toStream().to(OUTPUT_TOPIC); // in case you need this output topic; otherwise you can also omit it completely
Run Code Online (Sandbox Code Playgroud)
这应该会显着降低内存使用量.
关于窗口与非窗口:
这是你所需的语义问题; 如此简单的从非窗口切换到窗口缩小似乎是值得怀疑的.
即使你也可以使用窗口语义,你也不一定会减少内存.注意,在聚合情况下,Streams不存储原始记录,而只存储当前聚合结果(即key + currentAgg).因此,对于单个密钥,两种情况的存储要求是相同的(单个窗口具有相同的存储要求).同时,如果你使用Windows,你可能实际上需要更多的内存,因为你得到一个聚合的专业密钥专业窗口(而在非窗口的情况下你只得到一个聚合的专业密钥).您可以节省内存的唯一方案是,您的"密钥空间"会在很长一段时间内展开.例如,您可能无法长时间获取某些键的任何输入记录.在非窗口的情况下,这些记录的聚合将一直存储,而对于窗口化的情况,如果具有此键的记录稍后发生,则将删除密钥/聚合记录并重新创建新的记录.再次(但请记住,在这种情况下你丢失了以前的aggergate - cf.(1))
最后但同样重要的是,您可能需要查看调整应用程序大小的指南:http://docs.confluent.io/current/streams/sizing.html
| 归档时间: |
|
| 查看次数: |
2134 次 |
| 最近记录: |