小编sco*_*ata的帖子

Dataflow 中的迭代处理

如此处所示 Dataflow 管道由固定的 DAG 表示。我想知道是否有可能实现一个管道,在该管道中处理继续进行,直到基于到目前为止计算的数据满足动态评估的条件。

这是一些伪代码来说明我想要实现的内容:

    PCollection pco = null
    while(true):
        pco = pco.apply(someTransform())
        if (conditionSatisfied(pco)):
            break
    pco.Write()
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow

3
推荐指数
1
解决办法
388
查看次数

SideInputs会破坏数据流性能

我正在使用数据流生成大量数据.

我测试了我的管道的两个版本:一个带有侧输入(不同大小),另一个没有.

当我在没有侧输入的情况下运行管道时,我的工作将在大约7分钟内完成.当我侧面输入来完成我的工作时,我的工作永远不会完成.

这是我的DoFn的样子:

public class MyDoFn extends DoFn<String, String> {

    final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pCollectionView;
    final List<CSVRecord> stuff;

    private Aggregator<Integer, Integer> dofnCounter =
            createAggregator("DoFn Counter", new Sum.SumIntegerFn());

    public MyDoFn(PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv, List<CSVRecord> m) {
        this.pCollectionView = pcv;
        this.stuff = m;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<TreeMap<Long, Float>>> pdata = processContext.sideInput(pCollectionView);

        processContext.output(AnotherClass.generateData(stuff, pdata));

        dofnCounter.addValue(1);
    }
}
Run Code Online (Sandbox Code Playgroud)

这就是我的管道:

final Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection<KV<String, TreeMap<Long, Float>>> data;
data = …
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow

3
推荐指数
1
解决办法
1432
查看次数

标签 统计

google-cloud-dataflow ×2