这与此问题最相似.
我正在Dataflow 2.x中创建一个管道,它从Pubsub队列中获取流输入.进来的每条消息都需要通过来自Google BigQuery的非常大的数据集进行流式处理,并在写入数据库之前将所有相关值附加到它(基于密钥).
麻烦的是来自BigQuery的映射数据集非常大 - 任何将其用作边输入的尝试都失败,Dataflow运行器抛出错误"java.lang.IllegalArgumentException:ByteString将太长".我尝试了以下策略:
1)侧输入
2)键值对映射
3)直接在ParDo(DoFn)中调用BQ
似乎这个任务并不真正适合"令人尴尬的可并行化"模型,所以我在这里咆哮错误的树吗?
编辑:
即使在数据流中使用高内存机器并尝试将侧输入到地图视图中,我也会收到错误 java.lang.IllegalArgumentException: ByteString would be too long
这是我正在使用的代码的示例(伪):
Pipeline pipeline = Pipeline.create(options);
PCollectionView<Map<String, TableRow>> mapData = pipeline
.apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql())
.apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn()))
.apply(View.asMap());
PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages()
.fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription)));
messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
JSONObject data = new JSONObject(new String(c.element().getPayload()));
String key = getKeyFromData(data);
TableRow sideInputData = c.sideInput(mapData).get(key);
if (sideInputData != …Run Code Online (Sandbox Code Playgroud)