Flink 窗口:聚合并输出到接收器

Xel*_*eli 3 apache-flink flink-streaming

我们有一个数据流,其中每个元素都是这种类型:

id: String
type: Type
amount: Integer
Run Code Online (Sandbox Code Playgroud)

amount我们想要聚合这个流并输出每周一次的总和。

目前的解决方案:

Flink 管道示例如下所示:

stream.keyBy(type)
      .window(TumblingProcessingTimeWindows.of(Time.days(7)))
      .reduce(sumAmount())
      .addSink(someOutput())
Run Code Online (Sandbox Code Playgroud)

用于输入

| id | type | amount |
| 1  | CAT  | 10     |
| 2  | DOG  | 20     |
| 3  | CAT  | 5      |
| 4  | DOG  | 15     |
| 5  | DOG  | 50     |
Run Code Online (Sandbox Code Playgroud)

如果窗口在记录之间结束34我们的输出将是:

| TYPE | sumAmount |
| CAT  | 15        | (id 1 and id 3 added together)
| DOG  | 20        | (only id 2 as been 'summed')
Run Code Online (Sandbox Code Playgroud)

Id45仍将在 flink 管道内,并将于下周输出。

因此下周我们的总产出将是:

| TYPE | sumAmount |
| CAT  | 15        | (of last week)
| DOG  | 20        | (of last week)
| DOG  | 65        | (id 4 and id 5 added together)
Run Code Online (Sandbox Code Playgroud)

新要求:

我们现在还想知道每条记录是在哪一周处理的。换句话说,我们的新输出应该是:

| TYPE | sumAmount | weekNumber |
| CAT  | 15        | 1          |
| DOG  | 20        | 1          |
| DOG  | 65        | 2          |
Run Code Online (Sandbox Code Playgroud)

但我们还想要一个像这样的额外输出:

| id | weekNumber |
| 1  | 1          |
| 2  | 1          |
| 3  | 1          |
| 4  | 2          |
| 5  | 2          |
Run Code Online (Sandbox Code Playgroud)

这要怎么处理呢?

flink有什么办法可以实现这一点吗?我想我们会有一个聚合函数,它可以对金额进行求和,但也可以输出每条记录与当前周数,例如,但我在文档中找不到执行此操作的方法。

(注意:我们每周处理大约 1 亿条记录,因此理想情况下我们只想在一周内将聚合保持在 flink 的状态,而不是所有单独的记录)

编辑:

我采用了下面安东描述的解决方案:

DataStream<Element> elements = 
  stream.keyBy(type)
        .process(myKeyedProcessFunction());

elements.addSink(outputElements());
elements.getSideOutput(outputTag)
        .addSink(outputAggregates())
Run Code Online (Sandbox Code Playgroud)

KeyedProcessFunction 看起来像:

class MyKeyedProcessFunction extends KeyedProcessFunction<Type, Element, Element>
    private ValueState<ZonedDateTime> state;
    private ValueState<Integer> sum;

    public void processElement(Element e, Context c, Collector<Element> out) {
        if (state.value() == null) {
            state.update(ZonedDateTime.now());
            sum.update(0);
            c.timerService().registerProcessingTimeTimer(nowPlus7Days);
        }
        element.addAggregationId(state.value());
        sum.update(sum.value() + element.getAmount());
    }

    public void onTimer(long timestamp, OnTimerContext c, Collector<Element> out) {
        state.update(null);
        c.output(outputTag, sum.value()); 
    }
} 
Run Code Online (Sandbox Code Playgroud)

Dav*_*son 6

有一个reduce 方法的变体,它将ProcessWindowFunction 作为第二个参数。你可以像这样使用它:

stream.keyBy(type)
  .window(TumblingProcessingTimeWindows.of(Time.days(7)))
  .reduce(sumAmount(), new WrapWithWeek())
  .addSink(someOutput())

private static class WrapWithWeek
  extends ProcessWindowFunction<Event, Tuple3<Type, Long, Long>, Type, TimeWindow> {

      public void process(Type key,
                Context context,
                Iterable<Event> reducedEvents,
                Collector<Tuple3<Type, Long, Long>> out) {
          Long sum = reducedEvents.iterator().next();
          out.collect(new Tuple3<Type, Long, Long>(key, context.window.getStart(), sum));
      }
}
Run Code Online (Sandbox Code Playgroud)

通常,ProcessWindowFunction 会传递一个 Iterable,其中包含窗口收集的所有事件,但如果您使用缩减或聚合函数来预聚合窗口结果,则只有该单个值会传递到 Iterable。有关此内容的文档位于此处,但文档中的示例当前有一个小错误,我已在此处的示例中修复了该错误。

但考虑到对第二个输出的新要求,我建议您放弃在 Windows 上执行此操作的想法,而使用带键的ProcessFunction。您将需要两块每个键的 ValueState:一块按周计数,另一块用于存储总和。您需要一个每周触发一次的计时器:当它触发时,它应该发出类型、总和和周数,然后递增周数。同时,流程元素方法将简单地输出每个传入事件的 ID 以及周计数器的值。