小编Mik*_*yes的帖子

如何在 Apache Beam Python 中获取窗口时间戳的结束

我正在从批处理日志数据中每 5 秒创建 20 秒长的滑动时间窗口:

    rows = p | 'read events' >> beam.io.Read(beam.io.BigQuerySource(query=query))

    # set timestamp field used for windowing and set 20 second long window every 5 seconds
    ts_rows = (rows | 'set timestamp' >> beam.ParDo(AddTimestampDoFn())
                    | 'set window' >> beam.WindowInto(window.SlidingWindows(20,5)))

    # extract only user id and relevant data, group and process
    rows_with_data = (ts_rows | 'extract data' >> beam.FlatMap(lambda row: 
                                [(str(row['user_id']),[row['data1'], row['data2'],row['data3']])])
                              | 'group by user id' >> beam.GroupByKey()
                              | 'Process window' >> beam.ParDo(WindowDataProcessingDoFn()))
Run Code Online (Sandbox Code Playgroud)

如何在 Python 中访问每个窗口的时间戳信息?(Java 的答案在这里,但我不知道如何将其翻译成 …

python google-cloud-dataflow apache-beam

5
推荐指数
1
解决办法
2819
查看次数

数据流模板中的动态bigquery查询

我编写了一个Dataflow作业,当我手动运行它时效果很好。这是相关的部分(为清楚起见,删除了一些验证代码):

parser.add_argument('--end_datetime',
                    dest='end_datetime')
known_args, pipeline_args = parser.parse_known_args(argv)

query = <redacted SQL String with a placeholder for a date>
query = query.replace('#ENDDATETIME#', known_args.end_datetime)

with beam.Pipeline(options=pipeline_options) as p:
    rows = p | 'read query' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
Run Code Online (Sandbox Code Playgroud)

现在,我想创建一个模板并将其安排为以动态ENDDATETIME定期运行。据我了解,为了做到这一点,我需要根据此文档将add_argument更改为add_value_provider_argument:

https://cloud.google.com/dataflow/docs/templates/creating-templates

不幸的是,似乎ValueProvider值在我需要它们时不可用,它们仅在管道内部可用。(如果我在这里错了,请纠正我...)。所以我有点卡住了。

是否有人对如何在Dataflow模板中的查询中获取动态日期有任何建议?

python google-cloud-dataflow apache-beam

3
推荐指数
1
解决办法
836
查看次数