Kafka Streams - 更新KTable上的聚合

fox*_*gen 5 apache-kafka apache-kafka-streams

我有一个KTable,其数据看起来像这样(key => value),其中key是客户ID,值是包含一些客户数据的小JSON对象:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
Run Code Online (Sandbox Code Playgroud)

我想对这个KTable进行一些聚合,并且基本上保持每个记录的数量age_group.所需的KTable数据如下所示:

"18-24" => 3
"25-30" => 1
Run Code Online (Sandbox Code Playgroud)

让我们说Alice,谁在18-24上面的小组中,有一个生日,让她进入新的年龄组.支持第一个KTable的状态存储现在应该如下所示:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
Run Code Online (Sandbox Code Playgroud)

我希望得到的聚合KTable结果能够反映出这一点.例如

"18-24" => 2
"25-30" => 2
Run Code Online (Sandbox Code Playgroud)

可能过度概括了这里描述的问题:

在Kafka Streams中没有最终聚合......根据您的使用情况,手动重复数据删除将是解决问题的一种方法"

但到目前为止我只能算出一个跑步总数,例如爱丽丝的生日会被解释为:

"18-24" => 3 # Old Alice record still gets counted here
"25-30" => 2 # New Alice record gets counted here as well
Run Code Online (Sandbox Code Playgroud)

编辑:这是我注意到的一些额外行为似乎意外.

我正在使用的拓扑看起来像:

dataKTable = builder.table("compacted-topic-1", "users-json")
    .groupBy((key, value) -> KeyValue.pair(getAgeRange(value), key))
    .count("age-range-counts")
Run Code Online (Sandbox Code Playgroud)

1)空状态

现在,从最初的空状态开始,一切看起来像这样:

compacted-topic-1
(empty)


dataKTable
(empty)


// groupBy()
Repartition topic: $APP_ID-age-range-counts-repartition
(empty)

// count()
age-range-counts state store
(empty)
Run Code Online (Sandbox Code Playgroud)

2)发送几条消息

现在,让我们发送一条消息给上面compacted-topic-1KTable.这是发生的事情:

compacted-topic-1
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

dataKTable
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }


// groupBy()
// why does this generate 4 events???
Repartition topic: $APP_ID-age-range-counts-repartition
18-24 => 3
18-24 => 3
18-24 => 4
18-24 => 4

// count()
age-range-counts state store
18-24 => 0
Run Code Online (Sandbox Code Playgroud)

所以我想知道:

  • 我正在尝试使用Kafka Streams 0.10.1或0.10.2做什么?我已经尝试使用groupBy,并count在DSL,但也许我需要使用类似的东西reduce
  • 此外,我在理解导致add减速器和subtract减速器被调用的环境方面遇到一些麻烦,因此任何这些要点的任何澄清都将受到高度赞赏.

Mat*_*Sax 8

如果你有原始的KTable包含id -> Json数据(让我们称之为dataKTable),你应该能够得到你想要的东西

KTable countKTablePerRange
    = dataKTable.groupBy(/* map your age-range to be the key*/)
                .count("someStoreName");
Run Code Online (Sandbox Code Playgroud)

这适用于所有版本的Kafka Streams API.

更新

关于重新分区主题中的4个值:这是正确的.对"base KTable"的每次更新都会为它的"旧值"写一条记录,并且它是"新值".这是KTable正确更新下游所必需的.必须从一个计数中删除旧值,并且必须将新值添加到另一个计数中.由于您的(计数)KTable可能是分布式的(即,在多个并行运行的应用程序实例上共享),因此两个记录(旧的和新的)可能最终都在不同的实例,因为它们可能具有不同的密钥,因此它们必须作为两个独立的记录发送.(记录格式应该比你在问题中显示的更复杂.)

这也解释了为什么你需要一个减法器和一个加法器.减法器从agg结果中删除旧记录,而加法器将新记录添加到agg结果中.

仍然不确定为什么你没有在结果中看到正确的计数.你运行了多少个实例?也许尝试禁用KTable通过设置缓存cache.max.bytes.buffering=0StreamsConfig.

  • 终于成功了。不知何故,我的状态存储库处于怪异状态。在我的开发环境中禁用缓存并重置主题可以解决该问题。感谢Matthias,您总是能得到很多帮助:) (2认同)