Jef*_*lin 10 google-cloud-dataflow
我刚刚开始使用Google Data Flow,我编写了一个简单的流程,从云存储中读取CSV文件.其中一个步骤涉及调用Web服务以丰富结果.有问题的Web服务在批量发送多个100个请求时表现更好.
在查看API时,我没有看到将PCollection的100个元素聚合到单个Par.do执行中的好方法.然后需要拆分结果以处理写入BigQuery表的流的最后一步.
不确定我是否需要使用窗口是我想要的.我看到的大多数窗口示例都更适合在给定时间段内进行计数.
您可以缓冲DoFn的本地成员变量中的元素,并在缓冲区足够大时以及在finishBundle中调用Web服务.例如:
class CallServiceFn extends DoFn<String, String> {
private List<String> elements = new ArrayList<>();
public void processElement(ProcessContext c) {
elements.add(c.element());
if (elements.size() >= MAX_CALL_SIZE) {
for (String result : callServiceWithData(elements)) {
c.output(result);
}
elements.clear();
}
}
public void finishBundle(Context c) {
for (String result : callServiceWithData(elements)) {
c.output(result);
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1004 次 |
| 最近记录: |