窗口化后,Google数据流流媒体管道不会将工作负载分配给多个工作人员

Bre*_*ghe 14 google-cloud-dataflow apache-beam

我正在尝试在python中设置数据流流管道.我对批处理管道有很多经验.我们的基本架构如下所示: 在此输入图像描述

第一步是进行一些基本处理,每条消息大约需要2秒才能进入窗口.我们使用3秒和3秒间隔的滑动窗口(可能会稍后更改,因此我们有重叠的窗口).作为最后一步,我们有SOG预测需要大约15秒的时间来处理,这显然是我们的瓶颈变换.

因此,我们似乎面临的问题是,在窗口化之前工作负载完全分布在我们的工作者上,但最重要的转换根本不是分布式的.所有的窗户一次一个地处理,看似1个工人,而我们有50个可用.

日志告诉我们,sog预测步骤每15秒输出一次,如果窗口将被更多的工作人员处理,则不应该是这种情况,因此这会在一段时间内产生巨大的延迟,这是我们不想要的.使用1分钟的消息,最后一个窗口的延迟为5分钟.当分配工作时,这应该只有大约15秒(SOG预测时间).所以在这一点上我们是无能为力的..

在此输入图像描述

有没有人看到我们的代码是否有问题或如何防止/规避这个?似乎这是谷歌云数据流内部发生的事情.这是否也出现在Java流媒体管道中?

在批处理模式下,一切正常.在那里,人们可以尝试进行重新洗牌以确保不会发生融合等.但是在流媒体窗口化之后,这是不可能的.

args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_pipeline_options(project=args.project_id,
                                        job_name='XX',
                                        num_workers=args.workers,
                                        max_num_workers=MAX_NUM_WORKERS,
                                        disk_size_gb=DISK_SIZE_GB,
                                        local=args.local,
                                        streaming=args.streaming)

pipeline = beam.Pipeline(options=pipeline_options)

# Build pipeline
# pylint: disable=C0330
if args.streaming:
    frames = (pipeline | 'ReadFromPubsub' >> beam.io.ReadFromPubSub(
        subscription=SUBSCRIPTION_PATH,
        with_attributes=True,
        timestamp_attribute='timestamp'
    ))

    frame_tpl = frames | 'CreateFrameTuples' >> beam.Map(
        create_frame_tuples_fn)

crops = frame_tpl | 'MakeCrops' >> beam.Map(make_crops_fn, NR_CROPS)
bboxs = crops | 'bounding boxes tfserv' >> beam.Map(
    pred_bbox_tfserv_fn, SERVER_URL)

sliding_windows = bboxs | 'Window' >> beam.WindowInto(
                  beam.window.SlidingWindows(
                        FEATURE_WINDOWS['goal']['window_size'],
                        FEATURE_WINDOWS['goal']['window_interval']),
                  trigger=AfterCount(30),
                  accumulation_mode=AccumulationMode.DISCARDING)

# GROUPBYKEY (per match)
group_per_match = sliding_windows | 'Group' >> beam.GroupByKey()
_ = group_per_match | 'LogPerMatch' >> beam.Map(lambda x: logging.info(
    "window per match per timewindow: # %s, %s", str(len(x[1])), x[1][0][
        'timestamp']))

sog = sliding_windows | 'Predict SOG' >> beam.Map(predict_sog_fn,
                                                SERVER_URL_INCEPTION,
                                                SERVER_URL_SOG )

pipeline.run().wait_until_finish()
Run Code Online (Sandbox Code Playgroud)

rob*_*twb 6

在梁中,并行度是关键 - 给定密钥的所有窗口都将在同一台机器上生成.但是,如果您有50多个密钥,则应该在所有工作人员之间分配.

您提到您无法在流式传输中添加重新洗牌.这应该是可能的; 如果您遇到错误,请在https://issues.apache.org/jira/projects/BEAM/issues上提交错误.是否重新加入GlobalWindows会使重新洗牌问题消失?