如此处所示, Dataflow 管道由固定的 DAG 表示。我想知道是否有可能实现一个管道,在该管道中处理继续进行,直到基于到目前为止计算的数据满足动态评估的条件。
这是一些伪代码来说明我想要实现的内容:
PCollection pco = null
while(true):
pco = pco.apply(someTransform())
if (conditionSatisfied(pco)):
break
pco.Write()
Run Code Online (Sandbox Code Playgroud) 我正在使用数据流生成大量数据.
我测试了我的管道的两个版本:一个带有侧输入(不同大小),另一个没有.
当我在没有侧输入的情况下运行管道时,我的工作将在大约7分钟内完成.当我用侧面输入来完成我的工作时,我的工作永远不会完成.
这是我的DoFn的样子:
public class MyDoFn extends DoFn<String, String> {
final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pCollectionView;
final List<CSVRecord> stuff;
private Aggregator<Integer, Integer> dofnCounter =
createAggregator("DoFn Counter", new Sum.SumIntegerFn());
public MyDoFn(PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv, List<CSVRecord> m) {
this.pCollectionView = pcv;
this.stuff = m;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<TreeMap<Long, Float>>> pdata = processContext.sideInput(pCollectionView);
processContext.output(AnotherClass.generateData(stuff, pdata));
dofnCounter.addValue(1);
}
}
Run Code Online (Sandbox Code Playgroud)
这就是我的管道:
final Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<KV<String, TreeMap<Long, Float>>> data;
data = …Run Code Online (Sandbox Code Playgroud)