gle*_*man 7 apache-kafka-streams
我们有以下高级DSL处理拓扑:
TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceMs).until(retensionTimeMs);
KTable<Windowed<K>, Long> table1 = stream1.groupByKey().count(timeWindow, "Stream_1_Count_Store");
KTable<Windowed<K>, Long> table2 = stream2.groupByKey().count(timeWindow, "Stream_2_Count_Store");
KTable<Windowed<K>, Pair<Long,Long> joined = table1.leftJoin(table2, someValueJoiner, joinSerde, "Join_Store");
KTable<Windowed<SmallerKey>, Tuple<Long,Long,Long>> grouped = joined.groupBy(someSelector);
KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> aggregated = grouped.aggregate(initializer, adder, subtractor, aggValueSerde, "Agg_Store_Name")
Run Code Online (Sandbox Code Playgroud)
简而言之,我们上面所做的是:
我们的想法是创建窗口事件计数并使用这些窗口键进行连接和聚合操作(在KTable案例中,这些操作没有窗口)
问题是:连接和聚合操作的状态存储没有保留机制,导致磁盘空间爆炸(RocksDB).
更具体地说:(跳跃)窗口导致键上的笛卡尔积,并且没有删除旧窗口的机制.
如果KTable键没有窗口化,而且只有足够多的唯一键,也会出现同样的问题
请注意,状态存储支持table1和table2没有空间问题,这是因为它们通过DSL提供了一个窗口存储来管理丢弃旧窗口.在连接和聚合中,我们将窗口密钥视为"任何旧密钥",DSL也使用非窗口KeyValueStore.
此问题与以下内容有关:KAFKA-4212,KAFKA-4273,汇合论坛问题
这里有没有被误解的概念?有没有一种使用DSL实现此拓扑的简单方法?如果没有,使用低级API实现它的建议方法是什么?
我想你可以这样:
StreamsBuilder builder = new StreamBuilder();
KStream<K,V> streams = builder.stream(/* pattern for both streams */);
KStream<SmallerKey,Tuple<Long,V,String>> enrichedStream = stream.transform(
/* custom Transformer that set the weaker grouping key right here
and puts the extracted component into the value before the aggregation;
additionally (that's why we need a Transformer) get the topic name from
context object and enrich the value accordingly (ie, third String argument in the output Tuple */);
KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> = stream.groupByKey.aggregate(
timeWindow,
/* initializer: return an empty Map;
aggregator:
for each input record, check if Map contains entry for Long key already (ie, extracted component, first argument from input Tuple<Long,V,String>;
if not, add new map entry with Pair(0,0)
take the corresponding Pair from the Map and increase one
counter depending on the original topic that
is encoded in the input value (ie, Pair.first is counter for first topic and Pair.second is counter for second topic) */);
Run Code Online (Sandbox Code Playgroud)
示例:
假设有两个输入流s1并s2带有以下记录(<TS,key,value>):
s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>
Run Code Online (Sandbox Code Playgroud)
在您的原始程序中,您将首先单独计算两个流(假设大小为5的翻滚窗口)(省略TS):
<W0<k1>, 1> | <W0<k2>, 1> | <W0<k1>, 2> | <W1<k2>, 1> | <W2<k2>, 1>
and
<W0<k1>, 1> | <W0<k2>, 1> | <W0<k2>, 2> | <W2<k2>, 1>
Run Code Online (Sandbox Code Playgroud)
你得到左连接后(得到所有记录后的结果,省略中间人):
<<W0<k1>, <2,1>> | <W0<k2>, <1,2>> | <W1<k2>, <1,null>> | <W2<k2>, <1,1>>
Run Code Online (Sandbox Code Playgroud)
现在,您使用"较弱的密钥"重新分组,将关键部分提取到值中,并将所有条目放入地图中,基于提取的关键部分.让我们假设我们基于"char"和"number"分割我们的密钥(即,k1将其拆分为kas smallerKey并将1其提取Long到值中).在得到聚合之后(我将地图表示为(k1 -> v1, k2 - v2):
<<W0<k>, (1 -> <2,1>, 2 -> <1,2>> | <W1<k>, (2 -> <1,null>)> | <W2<k>, (2 -> <1,1>)>
Run Code Online (Sandbox Code Playgroud)
如果这是一个正确的例子(我可能会错过了解您的问题描述).您可以使用transform/groupBy/aggregate执行相同的操作,如上所述.输入是:
s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>
Run Code Online (Sandbox Code Playgroud)
结果transform是(包括TS):
<1, k, <1,v1,s1>> | <2, k, <2,v2,s1>> | <3, k, <1,v3,s1>> | <6, k, <2,v4,s1>> | <12, k, <2,v5,s1>>
and
<1, k, <1,va,s2>> | <2, k, <2,vb,s2>> | <3, k, <2,vc,s2>> | <11, k, <2,vd,s2>>
Run Code Online (Sandbox Code Playgroud)
注意,
Transform实际上将两个流作为"一个流"处理,因为我们使用了模式预订 - 因此,输出只是一个流,其中包含来自两个原始流的交错记录.
您现在应用具有聚合结果的相同窗口(TS省略) - 我们通过交替显示每个原始输入流处理一个记录的结果)inputRecord ==> outputRecord
<1, k, <1,v1,s1>> ==> <W0<k>, (1 -> <1,null>)>
<1, k, <1,va,s2>> ==> <W0<k>, (1 -> <1,1>>
<2, k, <2,v2,s1>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1, null>)>
<2, k, <2,vb,s2>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1,1>)>
<3, k, <1,v3,s1>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1, null>)>
<3, k, <2,vc,s2>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1,2>)>
<6, k, <2,v4,s1>> ==> <W1<k>, (2 -> <1,null>)>
<11, k, <2,vd,s2>> ==> <W2<k>, (2 -> <null, 1>)>
<12, k, <2,v5,s1>> ==> <W2<k>, (2 -> <1,1>)>
Run Code Online (Sandbox Code Playgroud)
如果将此结果的每个键的最新记录与上面的结果进行比较,您会发现两者都是相同的.
| 归档时间: |
|
| 查看次数: |
1524 次 |
| 最近记录: |