小编And*_*rea的帖子

具有自定义对象数据类型的Kafka流聚合

我有一个从GenericRecord类型的主题中提取json字符串的处理器。现在我将流分成2个分支。我进入第一个分支,然后将(键,值)映射到2个字符串中,该字符串包含json的特定字段和该字段的值,并且按键分组。到目前为止,一切都很好。现在,我必须使用新的Type用户定义来聚合流,并且会收到异常。

这里的代码:

新类型:

private class Tuple {

    public int occ;
    public int sum;


    public Tuple (int occ, int sum) {
        this.occ = occ;
        this.sum = sum;
    }

    public void sum (int toAdd) {
        this.sum += toAdd;
        this.occ ++;
    }

    public int getAverage () {
        return this.sum / this.occ;
    }

    public String toString() {
        return occ + "-> " + sum + ": " + getAverage();
    }
Run Code Online (Sandbox Code Playgroud)

好流:

  StreamsBuilder builder = new StreamsBuilder();
    KStream<GenericRecord, GenericRecord> source =
          builder.stream(topic);

    KStream<GenericRecord, GenericRecord>[] branches …
Run Code Online (Sandbox Code Playgroud)

aggregation apache-kafka apache-kafka-streams

3
推荐指数
1
解决办法
2860
查看次数