我正在尝试在python中设置数据流流管道.我对批处理管道有很多经验.我们的基本架构如下所示:

第一步是进行一些基本处理,每条消息大约需要2秒才能进入窗口.我们使用3秒和3秒间隔的滑动窗口(可能会稍后更改,因此我们有重叠的窗口).作为最后一步,我们有SOG预测需要大约15秒的时间来处理,这显然是我们的瓶颈变换.
因此,我们似乎面临的问题是,在窗口化之前工作负载完全分布在我们的工作者上,但最重要的转换根本不是分布式的.所有的窗户一次一个地处理,看似1个工人,而我们有50个可用.
日志告诉我们,sog预测步骤每15秒输出一次,如果窗口将被更多的工作人员处理,则不应该是这种情况,因此这会在一段时间内产生巨大的延迟,这是我们不想要的.使用1分钟的消息,最后一个窗口的延迟为5分钟.当分配工作时,这应该只有大约15秒(SOG预测时间).所以在这一点上我们是无能为力的..
有没有人看到我们的代码是否有问题或如何防止/规避这个?似乎这是谷歌云数据流内部发生的事情.这是否也出现在Java流媒体管道中?
在批处理模式下,一切正常.在那里,人们可以尝试进行重新洗牌以确保不会发生融合等.但是在流媒体窗口化之后,这是不可能的.
args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_pipeline_options(project=args.project_id,
job_name='XX',
num_workers=args.workers,
max_num_workers=MAX_NUM_WORKERS,
disk_size_gb=DISK_SIZE_GB,
local=args.local,
streaming=args.streaming)
pipeline = beam.Pipeline(options=pipeline_options)
# Build pipeline
# pylint: disable=C0330
if args.streaming:
frames = (pipeline | 'ReadFromPubsub' >> beam.io.ReadFromPubSub(
subscription=SUBSCRIPTION_PATH,
with_attributes=True,
timestamp_attribute='timestamp'
))
frame_tpl = frames | 'CreateFrameTuples' >> beam.Map(
create_frame_tuples_fn)
crops = frame_tpl | 'MakeCrops' >> beam.Map(make_crops_fn, NR_CROPS)
bboxs = crops | 'bounding boxes tfserv' >> beam.Map(
pred_bbox_tfserv_fn, SERVER_URL)
sliding_windows = bboxs | 'Window' >> beam.WindowInto( …Run Code Online (Sandbox Code Playgroud)