小编Par*_*mar的帖子

如何理解kafka流聚合?

我是卡夫卡新手,正在学习它。我只是在为员工汇总数据,但遇到了问题。有人可以帮忙吗?

我有一个主题 timeoffs,其中包含 time_off_id 键和类型对象的值,其中还包含员工 ID。所以我想建立一个商店,其中员工 ID 应该是关键,值应该是该员工的休假列表。但我遵循以下方法,但遇到了问题。聚合数据时,提示方法引用中的返回类型错误:无法将 ArrayList 转换为 VR。你能帮助我吗。

代码:

KTable<String, TimeOff> timeoffs = builder.table(topic);
KGroupedTable<String, TimeOff> groupedTable = timeoffs.groupBy(
    (key, value) -> KeyValue.pair(value.getEmployeeId(), value)
);
groupedTable.aggregate(ArrayList<TimeOff>::new, (k, newValue, aggValue) -> {
  aggValue.add(newValue);
  return aggValue;
}, Materialized.as("NewStore"));
Run Code Online (Sandbox Code Playgroud)

我也尝试过这种方法,但这并没有解决问题。

TimeOffList 类:

package com.kafka.productiontest.models;

import java.util.ArrayList;

public class TimeOffList {
  ArrayList list = new ArrayList<TimeOff>();

  public TimeOffList add(Object s) {
    list.add(s);
    return this;
  }
}
Run Code Online (Sandbox Code Playgroud)

在流媒体类中:

groupedTable.aggregate(TimeOffList::new,
    (k, newValue, aggValue) -> (TimeOffList) aggValue.add(newValue));
Run Code Online (Sandbox Code Playgroud)

实施您的解决方案后,这个问题消失了,但现在面临 serde 的问题。我已经实现了 TimeOffListSerde。请检查下面的代码

KStream<String, TimeOff> source …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

0
推荐指数
1
解决办法
2357
查看次数

标签 统计

apache-kafka ×1

apache-kafka-streams ×1