SideInputs会破坏数据流性能

sco*_*ata 3 google-cloud-dataflow

我正在使用数据流生成大量数据.

我测试了我的管道的两个版本:一个带有侧输入(不同大小),另一个没有.

当我在没有侧输入的情况下运行管道时,我的工作将在大约7分钟内完成.当我侧面输入来完成我的工作时,我的工作永远不会完成.

这是我的DoFn的样子:

public class MyDoFn extends DoFn<String, String> {

    final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pCollectionView;
    final List<CSVRecord> stuff;

    private Aggregator<Integer, Integer> dofnCounter =
            createAggregator("DoFn Counter", new Sum.SumIntegerFn());

    public MyDoFn(PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv, List<CSVRecord> m) {
        this.pCollectionView = pcv;
        this.stuff = m;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<TreeMap<Long, Float>>> pdata = processContext.sideInput(pCollectionView);

        processContext.output(AnotherClass.generateData(stuff, pdata));

        dofnCounter.addValue(1);
    }
}
Run Code Online (Sandbox Code Playgroud)

这就是我的管道:

final Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection<KV<String, TreeMap<Long, Float>>> data;
data = p.apply(TextIO.Read.from("gs://where_the_files_are/*").named("Reading Data"))
        .apply(ParDo.named("Parsing data").of(new DoFn<String, KV<String, TreeMap<Long, Float>>>() {
            @Override
            public void processElement(ProcessContext processContext) throws Exception {

                // Parse some data

                processContext.output(KV.of(key, value));
            }
        }));

final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv =
        data.apply(GroupByKey.<String, TreeMap<Long, Float>>create())
                .apply(View.<String, Iterable<TreeMap<Long, Float>>>asMap());


DoFn<String, String> dofn = new MyDoFn(pcv, localList);

p.apply(TextIO.Read.from("gs://some_text.txt").named("Sizing"))
        .apply(ParDo.named("Generating the Data").withSideInputs(pvc).of(dofn))
        .apply(TextIO.Write.named("Write_out").to(outputFile));

p.run();
Run Code Online (Sandbox Code Playgroud)

我们花了大约两天的时间尝试各种方法来实现这一目标.我们将其缩小到包含侧面输入.如果processContext被修改为不使用侧输入,只要它包含在内,它仍然会非常慢.如果我们不调用.withSideInput(),它会再次非常快.

为了澄清,我们在侧输入尺寸上测试了这个,从20mb到1.5gb.

非常感谢任何见解.

编辑 包括一些工作ID:

2016-01-20_14_31_12-1354600113427960103

2016-01-21_08_04_33-1642110636871153093(最新)

Luk*_*wik 5

请试用Dataflow SDK 1.5.0+,它们应该解决您的问题的已知性能问题.

运行批处理管道时,Dataflow SDK 1.5.0+中的侧输入使用新的分布式格式.请注意,如果视图无法完全缓存在内存中,则使用旧版本Dataflow SDK的流管道和管道仍然需要重新读取侧输入.

使用新格式,我们使用索引来提供基于块的查找和缓存策略.因此,当通过索引查看列表或按键查看映射时,将仅加载包含所述索引或键的块.高速缓存大小大于工作集大小将有助于提高性能,因为频繁访问的索引/键不需要重新读取它们所包含的块.