小编Ben*_*ers的帖子

将 TupleTag 传递给 DoFn 方法

我正在尝试从 DoFn 方法获得两个输出,遵循Apache Beam 编程指南的示例

基本上在示例中,您传递一个 TupleTag,然后指定在哪里进行输出,这对我有用,问题是我在 ParDo 中调用了一个外部方法,但不知道如何传递这个 TupleTag,这是我的代码:

PCollectionTuple processedData = pubEv
  .apply("Processing", ParDo.of(new HandleEv())
      .withOutputTags(mainData, TupleTagList.of(failedData)));
Run Code Online (Sandbox Code Playgroud)

HandleEv 方法:

static class HandleEv extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
      c.output("test")
      c.output(failedData,"failed")
    }
}
Run Code Online (Sandbox Code Playgroud)

我得到的错误是cannot find symbol由于 failedData 不能从 HandleEv 访问,我试图在课程开始时声明 failedData 但也不起作用。

非常感谢

java google-cloud-dataflow apache-beam

2
推荐指数
1
解决办法
2608
查看次数

使用Google数据存储区以流模式检查项目

我想将项目插入Google Datastore(如果它们不存在).我写了一个数据流流媒体作业.

工作

static class RawToObjectConverter extends DoFn<String, Entity> {
    @Override
    public void processElement(ProcessContext c) {      

        Query<Entity> query = Query.entityQueryBuilder().kind("Post").filter(PropertyFilter.eq("postid", rq.postid))
                .build();
        QueryResults<Entity> posts = datastore.run(query);

        if (posts == null || !posts.hasNext()) {
            Entity post = Entity.builder(datastore.newKeyFactory().newKey("Post"))                 
                     .set("postid", "1")
                    .set("title", "p1")
                    .build();
            c.output(post);
        }           
    }
}
Run Code Online (Sandbox Code Playgroud)

问题

lines.apply(ParDo.of(new RawToObjectConverter()))
    .apply(DatastoreIO.v1().write().withProjectId(projectid));
Run Code Online (Sandbox Code Playgroud)

apply(PTransform<? super PCollection<Entity>,OutputT>)类型中的方法PCollection<Entity>不适用于参数(DatastoreV1.Write)

我也应该使用com.google.cloud.datastore.Datastorecom.google.datastore.v1.Entity

google-cloud-datastore google-cloud-platform google-cloud-dataflow

1
推荐指数
1
解决办法
165
查看次数

在非轻量级DoFn中访问侧输入

如果我的类扩展了DoFn,如何访问侧输入的元素?

例如:

假设我有一个ParDo变换,如:

PCollection<String> data = myData.apply("Get data",
    ParDo.of(new MyClass()).withSideInputs(myDataView));
Run Code Online (Sandbox Code Playgroud)

我有一节课: -

static class MyClass extends DoFn<String,String>
{
    //How to access side input here
}
Run Code Online (Sandbox Code Playgroud)

在这种情况下,c.sideInput()不起作用.

谢谢.

google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
774
查看次数