Tay*_*lor 9 apache google-bigquery google-cloud-dataflow apache-beam
这与此问题最相似.
我正在Dataflow 2.x中创建一个管道,它从Pubsub队列中获取流输入.进来的每条消息都需要通过来自Google BigQuery的非常大的数据集进行流式处理,并在写入数据库之前将所有相关值附加到它(基于密钥).
麻烦的是来自BigQuery的映射数据集非常大 - 任何将其用作边输入的尝试都失败,Dataflow运行器抛出错误"java.lang.IllegalArgumentException:ByteString将太长".我尝试了以下策略:
1)侧输入
2)键值对映射
3)直接在ParDo(DoFn)中调用BQ
似乎这个任务并不真正适合"令人尴尬的可并行化"模型,所以我在这里咆哮错误的树吗?
编辑:
即使在数据流中使用高内存机器并尝试将侧输入到地图视图中,我也会收到错误 java.lang.IllegalArgumentException: ByteString would be too long
这是我正在使用的代码的示例(伪):
Pipeline pipeline = Pipeline.create(options);
PCollectionView<Map<String, TableRow>> mapData = pipeline
.apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql())
.apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn()))
.apply(View.asMap());
PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages()
.fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription)));
messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
JSONObject data = new JSONObject(new String(c.element().getPayload()));
String key = getKeyFromData(data);
TableRow sideInputData = c.sideInput(mapData).get(key);
if (sideInputData != null) {
LOG.info("holyWowItWOrked");
c.output(new TableRow());
} else {
LOG.info("noSideInputDataHere");
}
}
}).withSideInputs(mapData));
Run Code Online (Sandbox Code Playgroud)
管道抛出异常并在从内部记录任何内容之前失败ParDo
.
堆栈跟踪:
java.lang.IllegalArgumentException: ByteString would be too long: 644959474+1551393497
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.concat(ByteString.java:524)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:576)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.copyFrom(ByteString.java:559)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString$Output.toByteString(ByteString.java:1006)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:951)
com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1000)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:133)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:771)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
请查看本文https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part中称为“模式:流式大型查询表”的部分-2(这可能是唯一可行的解决方案,因为您的侧面输入无法容纳到内存中):
描述:
大(以GB为单位)的查找表必须准确,并且经常更改或不适合内存使用。
例:
您具有零售商的销售点信息,需要将产品名称与包含productID的数据记录相关联。外部数据库中存储着成千上万的项目,这些项目可以不断变化。另外,必须使用正确的值处理所有元素。
解:
使用“ 调用外部服务进行数据丰富 ”模式,而不是调用微服务,而不是直接调用经过读取优化的NoSQL数据库(例如Cloud Datastore或Cloud Bigtable)。
对于每个要查找的值,请使用KV实用程序类创建一个“键值”对。执行GroupByKey以创建具有相同密钥类型的批处理以对数据库进行调用。在DoFn中,调用该键的数据库,然后通过遍历可迭代对象将其应用于所有值。请遵循“调用外部服务以进行数据丰富”中所述的有关客户端实例化的最佳实践。
本文介绍了其他相关模式:https : //cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1:
归档时间: |
|
查看次数: |
3026 次 |
最近记录: |