使用Google数据存储区以流模式检查项目

Kas*_*ady 1 google-cloud-datastore google-cloud-platform google-cloud-dataflow

我想将项目插入Google Datastore(如果它们不存在).我写了一个数据流流媒体作业.

工作

static class RawToObjectConverter extends DoFn<String, Entity> {
    @Override
    public void processElement(ProcessContext c) {      

        Query<Entity> query = Query.entityQueryBuilder().kind("Post").filter(PropertyFilter.eq("postid", rq.postid))
                .build();
        QueryResults<Entity> posts = datastore.run(query);

        if (posts == null || !posts.hasNext()) {
            Entity post = Entity.builder(datastore.newKeyFactory().newKey("Post"))                 
                     .set("postid", "1")
                    .set("title", "p1")
                    .build();
            c.output(post);
        }           
    }
}
Run Code Online (Sandbox Code Playgroud)

问题

lines.apply(ParDo.of(new RawToObjectConverter()))
    .apply(DatastoreIO.v1().write().withProjectId(projectid));
Run Code Online (Sandbox Code Playgroud)

apply(PTransform<? super PCollection<Entity>,OutputT>)类型中的方法PCollection<Entity>不适用于参数(DatastoreV1.Write)

我也应该使用com.google.cloud.datastore.Datastorecom.google.datastore.v1.Entity

Vik*_*lli 5

在使用toPb方法之前,您需要在使用DatastoreIO.v1().write()之前将com.google.cloud.datastore.Entity转换为RawToObjectConverter中的com.google.datastore.v1.Entity

c.output(post.toPb());
Run Code Online (Sandbox Code Playgroud)