我有一个从GenericRecord类型的主题中提取json字符串的处理器。现在我将流分成2个分支。我进入第一个分支,然后将(键,值)映射到2个字符串中,该字符串包含json的特定字段和该字段的值,并且按键分组。到目前为止,一切都很好。现在,我必须使用新的Type用户定义来聚合流,并且会收到异常。
这里的代码:
新类型:
private class Tuple {
public int occ;
public int sum;
public Tuple (int occ, int sum) {
this.occ = occ;
this.sum = sum;
}
public void sum (int toAdd) {
this.sum += toAdd;
this.occ ++;
}
public int getAverage () {
return this.sum / this.occ;
}
public String toString() {
return occ + "-> " + sum + ": " + getAverage();
}
Run Code Online (Sandbox Code Playgroud)
好流:
StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> source =
builder.stream(topic);
KStream<GenericRecord, GenericRecord>[] branches …Run Code Online (Sandbox Code Playgroud)