sco*_*ata 3 google-cloud-dataflow
我正在使用数据流生成大量数据.
我测试了我的管道的两个版本:一个带有侧输入(不同大小),另一个没有.
当我在没有侧输入的情况下运行管道时,我的工作将在大约7分钟内完成.当我用侧面输入来完成我的工作时,我的工作永远不会完成.
这是我的DoFn的样子:
public class MyDoFn extends DoFn<String, String> {
final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pCollectionView;
final List<CSVRecord> stuff;
private Aggregator<Integer, Integer> dofnCounter =
createAggregator("DoFn Counter", new Sum.SumIntegerFn());
public MyDoFn(PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv, List<CSVRecord> m) {
this.pCollectionView = pcv;
this.stuff = m;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<TreeMap<Long, Float>>> pdata = processContext.sideInput(pCollectionView);
processContext.output(AnotherClass.generateData(stuff, pdata));
dofnCounter.addValue(1);
}
}
Run Code Online (Sandbox Code Playgroud)
这就是我的管道:
final Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<KV<String, TreeMap<Long, Float>>> data;
data = p.apply(TextIO.Read.from("gs://where_the_files_are/*").named("Reading Data"))
.apply(ParDo.named("Parsing data").of(new DoFn<String, KV<String, TreeMap<Long, Float>>>() {
@Override
public void processElement(ProcessContext processContext) throws Exception {
// Parse some data
processContext.output(KV.of(key, value));
}
}));
final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv =
data.apply(GroupByKey.<String, TreeMap<Long, Float>>create())
.apply(View.<String, Iterable<TreeMap<Long, Float>>>asMap());
DoFn<String, String> dofn = new MyDoFn(pcv, localList);
p.apply(TextIO.Read.from("gs://some_text.txt").named("Sizing"))
.apply(ParDo.named("Generating the Data").withSideInputs(pvc).of(dofn))
.apply(TextIO.Write.named("Write_out").to(outputFile));
p.run();
Run Code Online (Sandbox Code Playgroud)
我们花了大约两天的时间尝试各种方法来实现这一目标.我们将其缩小到包含侧面输入.如果processContext被修改为不使用侧输入,只要它包含在内,它仍然会非常慢.如果我们不调用.withSideInput(),它会再次非常快.
为了澄清,我们在侧输入尺寸上测试了这个,从20mb到1.5gb.
非常感谢任何见解.
编辑 包括一些工作ID:
2016-01-20_14_31_12-1354600113427960103
2016-01-21_08_04_33-1642110636871153093(最新)
| 归档时间: |
|
| 查看次数: |
1432 次 |
| 最近记录: |