Nej*_*ver 6 dataflow google-cloud-datastore gcloud google-cloud-dataflow
我正在尝试执行数据流管道作业,该作业将从数据存储区一次对N个条目执行一个函数.在我的情况下,此函数将一批100个条目作为有效负载发送到某些REST服务.这意味着我想要查看来自一个数据存储区实体的所有条目,并一次将100个批处理条目发送到某些外部REST服务.
我目前的解决方案
上面描述的伪代码场景(忽略细节):
final int BATCH_SIZE = 100;
// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))
// 2. create keys to be used in group by so we get iterator in next task
.apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String key = generateKey(c);
EntryPOJO entry = processEntity(c);
c.output(KV.of(key, entry));
}
}))
// 3. Group by key
.apply(GroupByKey.create())
// 4. Programatically batch users
.apply(ParDo.of(new DoFn<KV<String, Iterable<EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
List<EntryPOJO> batchedEntries = new ArrayList<>();
for (EntryPOJO entry : c.element().getValue()) {
if (batchedEntries.size() >= BATCH_SIZE) {
sendToRESTEndpoint(batchedEntries);
batchedEntries = new ArrayList<>();
}
batchedEntries.add(entry);
}
sendToRESTEndpoint(batchedEntries);
}
}));
Run Code Online (Sandbox Code Playgroud)
我当前解决方案的主要问题
GroupByKey阻止执行最后一个ParDo(块步骤4),直到所有条目都分配给一个键.
解决方案通常有效,但我想并行执行所有操作(从数据存储区加载后立即向REST端点发送一批100个条目),这是我当前解决方案无法实现的,因为GroupByKey直到获取数据库中的每个条目并将其插入到键值对中.因此,执行实际上分为两步:1.从数据存储中获取所有数据并为其分配密钥,2.将条目作为批处理
题
所以我想知道的是,是否有一些现有功能可以做到这一点.或者至少在没有GroupByKey步骤的情况下获得Iterable,以便批处理功能任务不需要等待数据被转储.
您可以在您的内容中批量处理这些元素DoFn.例如:
final int BATCH_SIZE = 100;
pipeline
// 1. Read input from datastore
.apply(DatastoreIO.readFrom(datasetId, query))
// 2. Programatically batch users
.apply(ParDo.of(new DoFn<DatastoreV1.Entity, Iterable<EntryPOJO>>() {
private final List<EntryPOJO> accumulator = new ArrayList<>(BATCH_SIZE);
@Override
public void processElement(ProcessContext c) throws Exception {
EntryPOJO entry = processEntity(c);
accumulator.add(c);
if (accumulator.size() >= BATCH_SIZE) {
c.output(accumulator);
accumulator = new ArrayList<>(BATCH_SIZE);
}
}
@Override
public void finishBundle(Context c) throws Exception {
if (accumulator.size() > 0) {
c.output(accumulator);
}
}
});
// 3. Consume those bundles
.apply(ParDo.of(new DoFn<Iterable<EntryPOJO>, Object>() {
@Override
public void processElement(ProcessContext c) throws Exception {
sendToRESTEndpoint(batchedEntries);
}
}));
Run Code Online (Sandbox Code Playgroud)
DoFn如果您不想单独的"批处理"步骤,也可以将步骤2和3合并为一个单独的步骤.