小编lil*_*ine的帖子

读取PubsubIO写入DatastoreIO

是否可以创建从Pub/Sub读取数据并写入数据存储区的管道?在我的代码中,我将PubsubIO指定为输入,并应用窗口来获取有界PCollection,但似乎无法使用带有options.setStreaming的DatastoreIO.writeTo为true,而这是必需的以便使用PubsubIO作为输入.有没有解决的办法?或者是不可能从pubsub读取并写入数据存储区?

这是我的代码:

DataflowPipelineOptions options = PipelineOptionsFactory.create()
            .as(DataflowPipelineOptions.class);

    options.setRunner(DataflowPipelineRunner.class);
    options.setProject(projectName);
    options.setStagingLocation("gs://my-staging-bucket/staging");
    options.setStreaming(true);

    Pipeline p = Pipeline.create(options);

    PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming"));
    PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1)));
    PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() {
        private static final long serialVersionUID = 1L;
        public void processElement(ProcessContext c) {
            String msg = c.element();
            byte[] decoded = Base64.decodeBase64(msg.getBytes());
            String outmsg = new String(decoded);
            c.output(outmsg);
        }
    }));
    PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events")));

    inputEntity.apply(DatastoreIO.writeTo(datasetid));


    p.run();
Run Code Online (Sandbox Code Playgroud)

这是我得到的例外:

Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the …
Run Code Online (Sandbox Code Playgroud)

google-cloud-datastore google-cloud-pubsub google-cloud-dataflow

5
推荐指数
2
解决办法
1355
查看次数