小编Nej*_*ver的帖子

谷歌数据流管道中的数据存储输入是否可以一次处理一批N个条目?

我正在尝试执行数据流管道作业,该作业将从数据存储区一次N个条目执行一个函数.在我的情况下,此函数将一批100个条目作为有效负载发送到某些REST服务.这意味着我想要查看来自一个数据存储区实体的所有条目,并一次100个批处理条目发送到某些外部REST服务.

我目前的解决方案

  1. 从数据存储读取输入
  2. 创建与管道选项中指定的工作者一样多的键(1 worker = 1键).
  3. 按键分组,以便我们将迭代器作为输出(步骤4中的迭代器输入)
  4. 以编程方式批处理临时列表中的用户,并将它们作为批处理发送到REST端点.

上面描述的伪代码场景(忽略细节):

final int BATCH_SIZE = 100;

// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))

    // 2. create keys to be used in group by so we get iterator in next task
    .apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            String key = generateKey(c);
            EntryPOJO entry = processEntity(c);
            c.output(KV.of(key, entry));
        }
    }))

    // 3. Group by key
    .apply(GroupByKey.create())

    // …
Run Code Online (Sandbox Code Playgroud)

dataflow google-cloud-datastore gcloud google-cloud-dataflow

6
推荐指数
1
解决办法
1092
查看次数