小编Bal*_*alu的帖子

将无限集合写入 GCS

我看过很多关于同一主题的问题。但是,我仍然遇到写入 GCS 的问题。我正在从 pubsub 阅读该主题并尝试将其推送到 GCS。我已经提到了这个链接。但是,在最新的束包中找不到 IOChannelUtils。

PCollection<String> details = pipeline
            .apply(PubsubIO.readStrings().fromTopic("/topics/<project>/sampleTopic"));

PCollection<KV<String, String>> keyedStream = details.apply(WithKeys.of(new SerializableFunction<String, String>() {
        public String apply(String s) {
            return "constant";
        }
    }));

    PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream.apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_MIN)).withAllowedLateness(ONE_DAY)
            .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(10))
                    .withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(10),
                            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_SECONDS))))
            .discardingFiredPanes()).apply(GroupByKey.create());

    PCollection<Iterable<String>> windows = keyedWindows.apply(Values.create());
Run Code Online (Sandbox Code Playgroud)

这是我从堆栈溢出中的许多其他类似主题中获取的。现在,我明白了,TextIO 确实支持 withWindowedWrites 和 withNumShards 的无界 PCollection 写入选项。

参考:使用 DoFn 使用 Cloud Dataflow 从 PubSub 写入 Google Cloud Storage

但是,我不明白,我该怎么做。

我正在尝试按如下方式写入 GCS。

FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
            StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");

    details.apply(TextIO.write().to("gs://<bucket>/topicfile").withWindowedWrites()
            .withFilenamePolicy(policy).withNumShards(4));
Run Code Online (Sandbox Code Playgroud)

我没有足够的点数来为 Stack Overflow …

google-cloud-dataflow apache-beam apache-beam-io

4
推荐指数
1
解决办法
4000
查看次数

Apache Beam:将具有对象列表的对象转换为多个 TableRow 以写入 BigQuery

我正在使用光束管道来处理 json 并将其写入 bigquery。JSON 是这样的。

{
"message": [{
    "name": "abc",
    "itemId": "2123",
    "itemName": "test"

}, {
    "name": "vfg",
    "itemId": "56457",
    "itemName": "Chicken"
}],
"publishDate": "2017-10-26T04:54:16.207Z"
Run Code Online (Sandbox Code Playgroud)

}

我使用 Jackson 将其解析为以下结构。

class Feed{
List<Message> messages; 
TimeStamp  publishDate;

}
Run Code Online (Sandbox Code Playgroud)
public class Message implements Serializable{

/**
 * 
 */
private static final long serialVersionUID = 1L;
private String key;
private String value;

private Map<String, String> eventItemMap = new HashMap<>();
this property translate the list of map as a single map with all the key-value pair …
Run Code Online (Sandbox Code Playgroud)

google-bigquery google-cloud-dataflow apache-beam apache-beam-io

3
推荐指数
1
解决办法
4288
查看次数