是否有在Flink中使用直方图累加器的示例

Cub*_*Ron 4 apache-flink

我在Flink层次结构中偶然发现了Histogram类,但是没有"这里是如何使用这种"的文档.我想做的事情如下:

dataStream
    .countWindowAll(100)
    .fold(new Histogram(), (histogram,data) -> {histogram.add(data.getValue()); return histogram;})
    .flatmap((h, out) -> h.getLocalValue().navigableKeySet.iterator().forEachRemaining(key -> out.collect(key.toString()+","+h.get(key).toString()))
    .print()
Run Code Online (Sandbox Code Playgroud)

但遗憾的Histogram是,这不是通过Flink可序列化的.也许有一个"这里是如何使用这个"或者还有另一种方法来通过flink进行直方图.

我显然做错了什么.

Fab*_*ske 6

Flink的累加器不能用作DataStream或的数据类型DataSet.

相反,他们通过注册RuntimeContext,这可从RichFunction.getRuntimeContext(). This is usually done in the开()method of aRichFunction`:

class MyFunc extends RichFlatMapFunction[Int, Int] {

  val hist: Histogram = new Histogram()

  override def open(conf: Configuration): Unit = {
    getRuntimeContext.addAccumulator("myHist", hist)
  }

  override def flatMap(value: Int, out: Collector[Int]): Unit = {
    hist.add(value)
  }
}
Run Code Online (Sandbox Code Playgroud)

累加器的所有并行实例定期发送到JobManager(主进程)并合并.可以从JobExecutionResult返回的值中访问它们的值StreamExecutionEnvironment.execute().

我认为您的用例无法通过Flink的累加器来解决.您应该创建自定义直方图数据类型.