如何在 Apache Beam Python 中获取窗口时间戳的结束

Mik*_*yes 5 python google-cloud-dataflow apache-beam

我正在从批处理日志数据中每 5 秒创建 20 秒长的滑动时间窗口:

    rows = p | 'read events' >> beam.io.Read(beam.io.BigQuerySource(query=query))

    # set timestamp field used for windowing and set 20 second long window every 5 seconds
    ts_rows = (rows | 'set timestamp' >> beam.ParDo(AddTimestampDoFn())
                    | 'set window' >> beam.WindowInto(window.SlidingWindows(20,5)))

    # extract only user id and relevant data, group and process
    rows_with_data = (ts_rows | 'extract data' >> beam.FlatMap(lambda row: 
                                [(str(row['user_id']),[row['data1'], row['data2'],row['data3']])])
                              | 'group by user id' >> beam.GroupByKey()
                              | 'Process window' >> beam.ParDo(WindowDataProcessingDoFn()))
Run Code Online (Sandbox Code Playgroud)

如何在 Python 中访问每个窗口的时间戳信息?(Java 的答案在这里,但我不知道如何将其翻译成 Python:如何获取当前滑动窗口的最大时间戳)理想情况下,我想要每个窗口的结束时间而不是最大或最小时间戳窗口内的数据。

x97*_*ore 6

我去了你提供的链接。

注意:window=beam.DoFn.WindowParam是您链接的页面上提到的参数。

窗口结束时间为beam.DoFn.WindowParam.end。在Python中,你可以像这样访问它:

定义你的 DoFn:

class BuildRecordFn(beam.DoFn):
def __init__(self):
    super(BuildAdsRecordFn, self).__init__()

def process(self, element,  window=beam.DoFn.WindowParam):
    #window_start = window.start.to_utc_datetime()
    window_end = window.end.to_utc_datetime()
    return [element + (window_end,)]
Run Code Online (Sandbox Code Playgroud)

然后像这样使用它:

    lines = p | ReadFromText(known_args.input)
    counts = (
        lines
        | 'ParseEventFn' >> beam.ParDo(ParseEventFn())

        | 'AddEventTimestamp' >> beam.Map(
            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))

        | 'ExtractObjectID' >> beam.Map(lambda elem: (elem["objectID"]))

        | 'FixedWindow' >> beam.WindowInto(
            beam.window.FixedWindows(60*1))

        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))

        | 'GroupAndSum' >> beam.CombinePerKey(sum)

        | 'AddWindowEndTimestamp'(beam.ParDo(BuildRecordFn()))

        | 'Format' >> beam.Map(format_result)

        | WriteToText(known_args.output) 


    def format_result(xs):
        ys = [str(x) for x in xs]
        return ','.join(ys)
Run Code Online (Sandbox Code Playgroud)