使用Google Dataflow实现长期生存状态

bfa*_*bry 7 google-cloud-platform google-cloud-dataflow

试着在这里尝试编程模型.场景是我正在使用Pub/Sub + Dataflow来为网络论坛进行分析.我有来自Pub/Sub的数据流,如下所示:

ID | TS | EventType
1  | 1  | Create
1  | 2  | Comment
2  | 2  | Create
1  | 4  | Comment
Run Code Online (Sandbox Code Playgroud)

我想最终得到一个来自Dataflow的流,看起来像:

ID | TS | num_comments
1  | 1  | 0
1  | 2  | 1
2  | 2  | 0
1  | 4  | 2
Run Code Online (Sandbox Code Playgroud)

我希望执行此汇总的作业作为流进程运行,并在新事件进入时填充新计数.我的问题是,作业存储当前主题ID和注释计数的状态的惯用位置在哪里?假设主题可以存活多年.目前的想法是:

  • 将主题id的"当前"条目写入BigTable,并在DoFn查询中查询主题ID的当前注释计数是什么.即使我写这篇文章,我也不是粉丝.
  • 以某种方式使用侧输入?似乎这可能是答案,但如果是这样的话,我并不完全理解.
  • 使用全局窗口设置流式作业,每次获取记录时触发器都会关闭,并依靠Dataflow将整个窗格历史记录保存在某个位置.(无限制的存储要求?)

编辑:只是为了澄清,我不会在实现这三种策略中的任何一种,或者百万种不同的其他方式时遇到任何麻烦,我对使用Dataflow这样做的最佳方式更感兴趣.什么是最有弹性的失败,必须重新处理回填历史等等.

编辑2:数据流服务目前存在一个错误,如果向展平转换添加输入,则更新失败,这意味着如果您对包含添加的作业进行更改,则需要丢弃并重建作业中累积的任何状态事情是扁平化的.

Ben*_*ers 8

您应该能够使用触发器和联合收割机来完成此任务.

PCollection<ID> comments = /* IDs from the source */;
PCollection<KV<ID, Long>> commentCounts = comments
    // Produce speculative results by triggering as data comes in.
    // Note that this won't trigger after *every* element, but it will
    // trigger relatively quickly (as the system divides incoming data
    // into work units). You could also throttle this with something
    // like:
    //   AfterProcessingTime.pastFirstElementInPane()
    //     .plusDelayOf(Duration.standardMinutes(5))
    // which will produce output every 5 minutes
    .apply(Window.triggering(
            Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
        .accumulatingFiredPanes())
    // Count the occurrences of each ID
    .apply(Count.perElement());

// Produce an output String -- in your use case you'd want to produce
// a row and write it to the appropriate source  
commentCounts.apply(new DoFn<KV<ID, Long>, String>() {
  public void processElement(ProcessContext c) {
    KV<ID, Long> element = c.element();
    // This includes details about the pane of the window being
    // processed, and including a strictly increasing index of the
    // number of panes that have been produced for the key.        
    PaneInfo pane = c.pane();
    return element.key() + " | " + pane.getIndex() + " | " + element.value(); 
  }
});
Run Code Online (Sandbox Code Playgroud)

根据您的数据,您还可以从源读取整个注释,提取ID,然后使用Count.perKey()获取每个ID的计数.如果您想要更复杂的组合,可以考虑定义自定义CombineFn和使用Combine.perKey.