小编Tay*_*lor的帖子

数据流大侧输入中的Apache Beam

这与此问题最相似.

我正在Dataflow 2.x中创建一个管道,它从Pubsub队列中获取流输入.进来的每条消息都需要通过来自Google BigQuery的非常大的数据集进行流式处理,并在写入数据库之前将所有相关值附加到它(基于密钥).

麻烦的是来自BigQuery的映射数据集非常大 - 任何将其用作边输入的尝试都失败,Dataflow运行器抛出错误"java.lang.IllegalArgumentException:ByteString将太长".我尝试了以下策略:

1)侧输入

  • 如上所述,映射数据(显然)太大而无法做到这一点.如果我在这里错了或有解决办法,请告诉我,因为这将是最简单的解决方案.

2)键值对映射

  • 在这种策略中,我读了管道的第一部分的BigQuery数据和PubSub的信息数据,然后运行每个通过更改PCollections到键值对每个值帕尔多变换.然后,我运行Merge.Flatten转换和GroupByKey转换,将相关的映射数据附加到每条消息.
  • 这里的问题是流数据需要将窗口与其他数据合并,因此我必须将窗口应用于大的有界BigQuery数据.它还要求两个数据集上的窗口策略相同.但是对于有界数据没有窗口策略是有意义的,并且我做的少量窗口尝试只是在一个窗口中发送所有BQ数据然后再也不发送它.它需要与每个传入的pubsub消息连接.

3)直接在ParDo(DoFn)中调用BQ

  • 这似乎是一个好主意 - 让每个工作人员声明一个静态的地图数据实例.如果它不存在,那么直接调用BigQuery来获取它.不幸的是,每次都会抛出BigQuery的内部错误(就像在整个消息中只是说"内部错误").向Google提交支持票后,他们告诉我,基本上,"你不能这样做".

似乎这个任务并不真正适合"令人尴尬的可并行化"模型,所以我在这里咆哮错误的树吗?

编辑:

即使在数据流中使用高内存机器并尝试将侧输入到地图视图中,我也会收到错误 java.lang.IllegalArgumentException: ByteString would be too long

这是我正在使用的代码的示例(伪):

    Pipeline pipeline = Pipeline.create(options);

    PCollectionView<Map<String, TableRow>> mapData = pipeline
            .apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql())
            .apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn())) 
            .apply(View.asMap());

    PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages()
            .fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription)));

    messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            JSONObject data = new JSONObject(new String(c.element().getPayload()));
            String key = getKeyFromData(data);
            TableRow sideInputData = c.sideInput(mapData).get(key);
            if (sideInputData != …
Run Code Online (Sandbox Code Playgroud)

apache google-bigquery google-cloud-dataflow apache-beam

9
推荐指数
1
解决办法
3026
查看次数