是否可以创建从Pub/Sub读取数据并写入数据存储区的管道?在我的代码中,我将PubsubIO指定为输入,并应用窗口来获取有界PCollection,但似乎无法使用带有options.setStreaming的DatastoreIO.writeTo为true,而这是必需的以便使用PubsubIO作为输入.有没有解决的办法?或者是不可能从pubsub读取并写入数据存储区?
这是我的代码:
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectName);
options.setStagingLocation("gs://my-staging-bucket/staging");
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming"));
PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1)));
PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() {
private static final long serialVersionUID = 1L;
public void processElement(ProcessContext c) {
String msg = c.element();
byte[] decoded = Base64.decodeBase64(msg.getBytes());
String outmsg = new String(decoded);
c.output(outmsg);
}
}));
PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events")));
inputEntity.apply(DatastoreIO.writeTo(datasetid));
p.run();
Run Code Online (Sandbox Code Playgroud)
这是我得到的例外:
Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the …
Run Code Online (Sandbox Code Playgroud) google-cloud-datastore google-cloud-pubsub google-cloud-dataflow