Flink:如何将不赞成使用的折页转换为总折页?

flu*_*y03 6 scala aggregate fold apache-flink flink-streaming

我正在遵循Flink的快速入门示例:监视Wikipedia编辑流

该示例使用Java,并且正在Scala中实现,如下所示:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits.keyBy( _.getUser )
      .timeWindow(Time.seconds(5))
      .fold(("", 0L)) {
        (acc: (String, Long), event: WikipediaEditEvent) => {
          (event.getUser, acc._2 + event.getByteDiff)
        }
      }

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }
}
Run Code Online (Sandbox Code Playgroud)

但是,foldFlink中的功能已被弃用aggregate建议使用该功能。

在此处输入图片说明

但我没有找到有关如何转换的过时的例子或教程foldaggregrate

任何想法如何做到这一点?可能不仅是通过应用aggregrate

更新

我有另一个实现如下:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits
      .map( e => UserWithEdits(e.getUser, e.getByteDiff) )
      .keyBy( "user" )
      .timeWindow(Time.seconds(5))
      .sum("edits")

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }

  /** Data type for words with count */
  case class UserWithEdits(user: String, edits: Long)
}
Run Code Online (Sandbox Code Playgroud)

我也想知道如何使用self-defined实现AggregateFunction

更新

我遵循了以下文档:AggregateFunction,但是有以下问题:

AggregateFunction1.3版的Interface的源代码中,您将看到add确实返回void

void add(IN value, ACC accumulator);
Run Code Online (Sandbox Code Playgroud)

但是对于1.4版本AggregateFunction,返回的是:

ACC add(IN value, ACC accumulator);
Run Code Online (Sandbox Code Playgroud)

我该如何处理?

我正在使用的Flink版本是,1.3.2并且该版本的文档AggregateFunction尚无,但是工件中没有1.4版。

在此处输入图片说明

Dav*_*son 3

AggregateFunction 您将在 Flink 1.4 文档 中找到一些文档,包括一个示例。

1.3.2 中包含的版本仅限于与可变累加器类型一起使用,其中添加操作会修改累加器。Flink 1.4已修复此问题,但尚未发布。