使用python在数据流中为每个窗口写入一个文件

Pav*_*uri 1 google-cloud-platform google-cloud-dataflow apache-beam

从像 pub/sub 这样的无限源读取数据后,我正在应用窗口化。我需要将属于一个窗口的所有记录写入一个单独的文件。我在 Java 中找到了这个,但在 python 中找不到任何东西。

Gui*_*ins 5

问题中没有关于您的用例的详细信息,因此您可能需要调整以下示例的某些部分。一种方法是使用元素所属的窗口作为键对元素进行分组。然后,我们利用filesystems.FileSystems.create来控制我们希望如何写入文件。

在这里,我将使用 10 秒的窗口和一些虚拟数据,其中每个事件间隔 4 秒。生成:

data = [{'event': '{}'.format(event), 'timestamp': time.time() + 4*event} for event in range(10)]
Run Code Online (Sandbox Code Playgroud)

我们使用该timestamp字段来分配元素时间戳(这只是为了以受控方式模拟 Pub/Sub 事件)。我们将事件窗口化,使用窗口化信息作为键,按键分组并将结果写入output文件夹:

events = (p
  | 'Create Events' >> beam.Create(data) \
  | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
  | 'Add Windows' >> beam.WindowInto(window.FixedWindows(10)) \
  | 'Add Window Info' >> beam.ParDo(AddWindowingInfoFn()) \
  | 'Group By Window' >> beam.GroupByKey() \
  | 'Windowed Writes' >> beam.ParDo(WindowedWritesFn('output/')))
Run Code Online (Sandbox Code Playgroud)

哪里AddWindowingInfoFn很简单:

class AddWindowingInfoFn(beam.DoFn):
  """output tuple of window(key) + element(value)"""
  def process(self, element, window=beam.DoFn.WindowParam):
    yield (window, element)
Run Code Online (Sandbox Code Playgroud)

WindowedWritesFn写入我们在管道中指定的路径(output/在我的例子中是文件夹)。然后,我使用窗口信息作为文件名。为方便起见,我将纪元时间戳转换为人类可读的日期。最后,我们遍历所有元素并将它们写入相应的文件。当然,这个行为可以在这个函数中随意调整:

class WindowedWritesFn(beam.DoFn):
    """write one file per window/key"""
    def __init__(self, outdir):
        self.outdir = outdir

    def process(self, element):
        (window, elements) = element
        window_start = str(window.start.to_utc_datetime()).replace(" ", "_")
        window_end = str(window.end.to_utc_datetime()).replace(" ", "_")
        writer = filesystems.FileSystems.create(self.outdir + window_start + ',' + window_end + '.txt')

        for row in elements:
          writer.write(str(row)+ "\n")

        writer.close()
Run Code Online (Sandbox Code Playgroud)

这会将属于每个窗口的元素写入不同的文件。就我而言,有 5 个不同的

$ ls output/
2019-05-21_19:01:20,2019-05-21_19:01:30.txt
2019-05-21_19:01:30,2019-05-21_19:01:40.txt
2019-05-21_19:01:40,2019-05-21_19:01:50.txt
2019-05-21_19:01:50,2019-05-21_19:02:00.txt
2019-05-21_19:02:00,2019-05-21_19:02:10.txt
Run Code Online (Sandbox Code Playgroud)

第一个只包含元素 0(这将因执行而异):

$ cat output/2019-05-21_19\:01\:20\,2019-05-21_19\:01\:30.txt 
{'timestamp': 1558465286.933727, 'event': '0'}
Run Code Online (Sandbox Code Playgroud)

第二个包含元素 1 到 3,依此类推:

$ cat output/2019-05-21_19\:01\:30\,2019-05-21_19\:01\:40.txt 
{'timestamp': 1558465290.933728, 'event': '1'}
{'timestamp': 1558465294.933728, 'event': '2'}
{'timestamp': 1558465298.933729, 'event': '3'}
Run Code Online (Sandbox Code Playgroud)

这种方法的警告是来自同一个窗口的所有元素都被分组到同一个工作人员中。如果根据您的情况写入单个分片或输出文件,无论如何都会发生这种情况,但对于更高的负载,您可能需要考虑更大的机器类型。

完整代码在这里