小编Mel*_*man的帖子

如何使用事务性DatastoreIO

我正在使用来自流式数据流管道的DatastoreIO,并在使用相同的密钥编写实体时收到错误.

2016-12-10T22:51:04.385Z: Error:   (af00222cfd901860): Exception: com.google.datastore.v1.client.DatastoreException: A non-transactional commit may not contain multiple mutations affecting the same entity., code=INVALID_ARGUMENT
Run Code Online (Sandbox Code Playgroud)

如果我在密钥中使用随机数,那么事情可行,但我需要更新相同的密钥,那么有没有使用DataStoreIO执行此操作的事务方法?

static class CreateEntityFn extends DoFn<KV<String, Tile>, Entity> {
  private static final long serialVersionUID = 0;

  private final String namespace;
  private final String kind;

  CreateEntityFn(String namespace, String kind) {
    this.namespace = namespace;
    this.kind = kind;
  }

  public Entity makeEntity(String key, Tile tile) {
    Entity.Builder entityBuilder = Entity.newBuilder();
    Key.Builder keyBuilder = makeKey(kind, key );
    if (namespace != null) {
      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
    }
    entityBuilder.setKey(keyBuilder.build()); …
Run Code Online (Sandbox Code Playgroud)

google-cloud-datastore google-cloud-dataflow

8
推荐指数
1
解决办法
1521
查看次数

如何在数据流中传递 HashMap 作为侧面输入

我正在尝试将 HashMap 作为数据流管道中的侧面输入传递。除了少数传递 String、Int 或 Long 的示例之外,我找不到任何示例。我的代码:

tagList = pipeline.apply(TextIO.Read.named("tagListTextRead").from("gs://mybucket/tag-list.json"));

PCollection<Map<String,TagObject>> tagMap = tagList
            .apply(ParDo.named("allTagsToTagMap").of(new Tags.BuildTagListMapFn()));


PCollectionView<Map<String, TagObject>> tagMapView =
            allTags.apply(View.<String, TagObject>asMap());
Run Code Online (Sandbox Code Playgroud)

第三条语句出现语法错误。

The method apply(PTransform<? super PCollection<Map<String,TagObject>>,OutputT>) in the type 
     PCollection<Map<String,TagObject>> is not applicable for the arguments 
     (View.AsMap<String,TagObject>)
Run Code Online (Sandbox Code Playgroud)

有人可以告诉我如何将 HashMap 作为数据流管道中的侧面输入传递。

google-cloud-dataflow

5
推荐指数
1
解决办法
2913
查看次数

通过读取gcs存储桶可以在每个窗口更新Dataflow sideInput吗?

我目前正在通过从gcs存储桶中读取过滤信息并将其作为侧输入传递到我的管道的不同阶段来创建PCollectionView,以便过滤输出.如果gcs存储桶中的文件发生更改,我希望当前运行的管道使用此新的过滤器信息.如果我的过滤器发生变化,有没有办法在每个新的数据窗口更新这个PCollectionView?我以为我可以在startBundle中做到这一点,但我无法弄清楚它是如何或是否可能.如果有可能,你能举个例子吗?

PCollectionView<Map<String, TagObject>> 
    tagMapView =
        pipeline.apply(TextIO.Read.named("TagListTextRead")
                                  .from("gs://tag-list-bucket/tag-list.json"))
                .apply(ParDo.named("TagsToTagMap").of(new Tags.BuildTagListMapFn()))
                .apply("MakeTagMapView", View.asSingleton());
PCollection<String> 
    windowedData =
        pipeline.apply(PubsubIO.Read.topic("myTopic"))
                .apply(Window.<String>into(
                              SlidingWindows.of(Duration.standardMinutes(15))
                                            .every(Duration.standardSeconds(31))));
PCollection<MY_DATA> 
    lineData = windowedData
        .apply(ParDo.named("ExtractJsonObject")
            .withSideInputs(tagMapView)
            .of(new ExtractJsonObjectFn()));
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow

1
推荐指数
1
解决办法
865
查看次数