从自定义DoFn中产生`finish_bundle`的结果

Zac*_*she 1 apache-beam

我的流水线的第一步涉及从外部数据源中获取数据,我想分块进行(顺序无关紧要)。我找不到任何类似的类,因此创建了以下内容:

class FixedSizeBatchSplitter(beam.DoFn):
  def __init__(self, size):
    self.size = size

  def start_bundle(self):
    self.current_batch = []

  def finish_bundle(self):
    if self.current_batch: 
      yield self.current_batch

  def process(self, element):
    self.current_batch.append(element)
    if len(self.current_batch) >= self.size:
      yield self.current_batch
      self.current_batch = []
Run Code Online (Sandbox Code Playgroud)

但是,当我运行此管道时,出现RuntimeError: Finish Bundle should only output WindowedValue type错误:

with beam.Pipeline() as p:
  res = (p
         | beam.Create(range(10))
         | beam.ParDo(FixedSizeBatchSplitter(3))
        )
Run Code Online (Sandbox Code Playgroud)

这是为什么?我怎么才能在process而不是在其中产生输出finish_bundle?顺便说一句,如果我删除finish_bundle管道的工作,但显然会丢弃剩余的。

jkf*_*kff 7

A DoFn可能是来自多个不同窗口的处理元素。当您进入时process(),“当前窗口”是明确的-它是正在处理的元素的窗口。当您进入时finish_bundle,它是不明确的,您需要显式指定窗口。您需要产生某种形式的东西yield WindowedValue(something, timestamp, [window])

如果您所有的数据都在全局窗口中,则将变得更加容易:window将为GlobalWindow()。如果使用多个窗口,则每个窗口需要有1个缓冲区;捕获窗口,process()以便添加到适当的缓冲区中;并在finish_bundle各自的窗口中发射它们中的每一个。