flu*_*y03 6 scala aggregate fold apache-flink flink-streaming
我正在遵循Flink的快速入门示例:监视Wikipedia编辑流。
该示例使用Java,并且正在Scala中实现,如下所示:
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits.keyBy( _.getUser )
.timeWindow(Time.seconds(5))
.fold(("", 0L)) {
(acc: (String, Long), event: WikipediaEditEvent) => {
(event.getUser, acc._2 + event.getByteDiff)
}
}
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
}
Run Code Online (Sandbox Code Playgroud)
但是,foldFlink中的功能已被弃用,aggregate建议使用该功能。
但我没有找到有关如何转换的过时的例子或教程fold来aggregrate。
任何想法如何做到这一点?可能不仅是通过应用aggregrate。
更新
我有另一个实现如下:
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits
.map( e => UserWithEdits(e.getUser, e.getByteDiff) )
.keyBy( "user" )
.timeWindow(Time.seconds(5))
.sum("edits")
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
/** Data type for words with count */
case class UserWithEdits(user: String, edits: Long)
}
Run Code Online (Sandbox Code Playgroud)
我也想知道如何使用self-defined实现AggregateFunction。
更新
我遵循了以下文档:AggregateFunction,但是有以下问题:
在AggregateFunction1.3版的Interface的源代码中,您将看到add确实返回void:
void add(IN value, ACC accumulator);
Run Code Online (Sandbox Code Playgroud)
但是对于1.4版本AggregateFunction,返回的是:
ACC add(IN value, ACC accumulator);
Run Code Online (Sandbox Code Playgroud)
我该如何处理?
我正在使用的Flink版本是,1.3.2并且该版本的文档AggregateFunction尚无,但是工件中没有1.4版。
AggregateFunction 您将在 Flink 1.4 文档 中找到一些文档,包括一个示例。
1.3.2 中包含的版本仅限于与可变累加器类型一起使用,其中添加操作会修改累加器。Flink 1.4已修复此问题,但尚未发布。