Cai*_*ias 1 java firebase google-cloud-dataflow
在破解快速CSV格式到Firebase上传的过程中,我只是这样做,而不是编写自定义接收器。这是代码的过度简化:
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.as(Options.class);
Pipeline p = Pipeline.create(options);
PCollection<String> CsvData = p.apply(TextIO.Read.from("/my_file.csv"));
CsvData.apply(ParDo.named("Firebase").of(new DoFn<String, Void>() {
@Override
public void processElement(ProcessContext c) {
Firebase fb = new Firebase("https://MYAPP.firebaseio.com/");
fb.child("someId").setValue(c.element.getValue());
}
});
}
Run Code Online (Sandbox Code Playgroud)
有用。这是在Cloud Dataflow上使用REST API的地方吗?
是的,这应该可行,前提是您可以接受以下警告:在发生故障的情况下,捆绑包可以重复复制或重试多次,即,您的processElement调用可以在同一元素上多次调用(可能同时调用)。
即使Dataflow会对结果进行重复数据删除(即,仅通过发出的一个成功调用的项c.output()最终会出现在结果中PCollection),但对副作用进行重复数据删除(例如进行外部API调用)仍是代码的责任。
定制接收器API仅强调了这些问题,并提供了一种处理它们的“模式”(通过为捆绑包提供唯一的ID,并提供钩子来提交成功的结果-例如,基于文件的接收器将使每个捆绑包都写入一个唯一命名的临时文件,并且提交钩子会将成功完成的包写入的文件重命名到最终位置)-但是,如果您的用例对它们不敏感,那么可以很好地使用simple ParDo。
另外,请注意,Dataflow还没有用于流传输的自定义接收器API,因此,如果这是流传输管道,那么a ParDo绝对是正确的选择。
在您的ParDo中,您可能希望实现对Firebase的调用的批处理,以避免每次调用的开销。您可以使用来做到这一点DoFn.finishBundle()(例如,将批处理更新保存在缓冲区中,追加到缓冲区中,并在缓冲区processElement过大时将其刷新,最后一次写入中finishBundle)。请参阅此答案中类似模式的示例。
| 归档时间: |
|
| 查看次数: |
573 次 |
| 最近记录: |