使用ParDo的processElement消耗REST API

Cai*_*ias 1 java firebase google-cloud-dataflow

在破解快速CSV格式到Firebase上传的过程中,我只是这样做,而不是编写自定义接收器。这是代码的过度简化:

public static void main(String[] args) throws Exception {

    Options options = PipelineOptionsFactory.as(Options.class);
    Pipeline p = Pipeline.create(options);

    PCollection<String> CsvData = p.apply(TextIO.Read.from("/my_file.csv"));
    CsvData.apply(ParDo.named("Firebase").of(new DoFn<String, Void>() {
          @Override
          public void processElement(ProcessContext c) {
              Firebase fb = new Firebase("https://MYAPP.firebaseio.com/");
              fb.child("someId").setValue(c.element.getValue());
          }
        });

}
Run Code Online (Sandbox Code Playgroud)

有用。这是在Cloud Dataflow上使用REST API的地方吗?

jkf*_*kff 5

是的,这应该可行,前提是您可以接受以下警告:在发生故障的情况下,捆绑包可以重复复制或重试多次,即,您的processElement调用可以在同一元素上多次调用(可能同时调用)。

即使Dataflow会对结果进行重复数据删除(即,仅通过发出的一个成功调用的项c.output()最终会出现在结果中PCollection),但对副作用进行重复数据删除(例如进行外部API调用)仍是代码的责任。

定制接收器API仅强调了这些问题,并提供了一种处理它们的“模式”(通过为捆绑包提供唯一的ID,并提供钩子来提交成功的结果-例如,基于文件的接收器将使每个捆绑包都写入一个唯一命名的临时文件,并且提交钩子会将成功完成的包写入的文件重命名到最终位置)-但是,如果您的用例对它们不敏感,那么可以很好地使用simple ParDo

另外,请注意,Dataflow还没有用于流传输的自定义接收器API,因此,如果这是流传输管道,那么a ParDo绝对是正确的选择。

在您的ParDo中,您可能希望实现对Firebase的调用的批处理,以避免每次调用的开销。您可以使用来做到这一点DoFn.finishBundle()(例如,将批处理更新保存在缓冲区中,追加到缓冲区中,并在缓冲区processElement过大时将其刷新,最后一次写入中finishBundle)。请参阅此答案中类似模式的示例。