标签: google-cloud-dataflow

从Dataflow写入BigQuery - 作业完成后不会删除JSON文件

我们的一个Dataflow作业将其输出写入BigQuery.我对如何在幕后实现这一点的理解是,Dataflow实际上将结果(分片)以JSON格式写入GCS,然后启动BigQuery加载作业以导入该数据.

但是,我们注意到,在作业完成后,无论是成功还是失败,都不会删除某些JSON文件.错误消息中没有警告或建议不会删除文件.当我们注意到这一点时,我们看了一下我们的存储桶,它有几百个来自失败作业的大型JSON文件(主要是在开发期间).

我原以为Dataflow应该处理任何清理,即使作业失败,当它成功时,肯定会删除这些文件.在作业完成后留下这些文件会产生大量的存储成本!

这是一个错误吗?

作业的示例作业ID"成功"但在GCS中留下了数百个大文件:2015-05-27_18_21_21-8377993823053896089

在此输入图像描述

在此输入图像描述

在此输入图像描述

google-cloud-dataflow

6
推荐指数
2
解决办法
705
查看次数

通过Google Cloud Dataflow将PubSub消息插入BigQuery

我想使用Google Cloud Dataflow将来自主题的PubSub消息数据插入到BigQuery表中.一切都很好,但在BigQuery表中我可以看到像"߈ "这样难以理解的字符串.这是我的管道:

p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name"))
.apply(ParDo.named("Transformation").of(new StringToRowConverter()))
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table")
     .withSchema(schema)
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED))
Run Code Online (Sandbox Code Playgroud)

我的简单StringToRowConverter函数是:

class StringToRowConverter extends DoFn<String, TableRow> {
private static final long serialVersionUID = 0;

@Override
public void processElement(ProcessContext c) {
    for (String word : c.element().split(",")) {
      if (!word.isEmpty()) {
          System.out.println(word);
        c.output(new TableRow().set("data", word));
      }
    }
}
}
Run Code Online (Sandbox Code Playgroud)

这是我通过POST请求发送的消息:

POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish
{
 "messages": [
  {
   "attributes":{
"key": "tablet, smartphone, desktop",
"value": "eng"
   },
   "data": "34gf5ert"
  }
 ]
}
Run Code Online (Sandbox Code Playgroud)

我错过了什么?谢谢!

google-bigquery google-cloud-pubsub google-cloud-dataflow

6
推荐指数
1
解决办法
2477
查看次数

谷歌数据流管道中的数据存储输入是否可以一次处理一批N个条目?

我正在尝试执行数据流管道作业,该作业将从数据存储区一次N个条目执行一个函数.在我的情况下,此函数将一批100个条目作为有效负载发送到某些REST服务.这意味着我想要查看来自一个数据存储区实体的所有条目,并一次100个批处理条目发送到某些外部REST服务.

我目前的解决方案

  1. 从数据存储读取输入
  2. 创建与管道选项中指定的工作者一样多的键(1 worker = 1键).
  3. 按键分组,以便我们将迭代器作为输出(步骤4中的迭代器输入)
  4. 以编程方式批处理临时列表中的用户,并将它们作为批处理发送到REST端点.

上面描述的伪代码场景(忽略细节):

final int BATCH_SIZE = 100;

// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))

    // 2. create keys to be used in group by so we get iterator in next task
    .apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            String key = generateKey(c);
            EntryPOJO entry = processEntity(c);
            c.output(KV.of(key, entry));
        }
    }))

    // 3. Group by key
    .apply(GroupByKey.create())

    // …
Run Code Online (Sandbox Code Playgroud)

dataflow google-cloud-datastore gcloud google-cloud-dataflow

6
推荐指数
1
解决办法
1092
查看次数

从Google Dataflow访问在GKE中运行的HTTP服务

我在Google Container Engine集群上运行了一个HTTP服务(在kubernetes服务之后).

我的目标是使用固定名称从同一GCP项目上运行的Dataflow作业访问该服务(与使用DNS从GKE内部访问服务的方式相同).任何的想法?


  • 我在stackoverflow上阅读的大多数解决方案都依赖于在试图访问服务的机器上安装kube-proxy.据我所知,无法在Dataflow创建的每个工作实例上可靠地设置该服务.
  • 一种选择是创建外部平衡器并在公共DNS中创建A记录.虽然它有效,但我宁愿在我的公共DNS记录中没有指向该服务的条目.

google-kubernetes-engine google-cloud-dataflow

6
推荐指数
1
解决办法
1208
查看次数

如何使环境变量作为python SDK中的环境变量到达Dataflow工作者

我用python sdk编写自定义接收器。我尝试将数据存储到AWS S3。要连接S3,需要一些凭据,秘密密钥,但是出于安全原因,在代码中进行设置不是很好。我想使环境变量作为环境变量到达Dataflow工作人员。我该怎么做?

google-cloud-platform google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
1657
查看次数

Dataflow何时从PubSubIO确认批处理项目的消息?

关于这个主题存在一个问题,答案是"一旦消息在Dataflow管道中的某个地方持久存在,就会得到确认."

从概念上讲,这是有道理的,但我不确定Dataflow如何能够在消息被持久化之前在管道中进行反序列化和转换后跟踪消息.

在我们的示例中,PubSub消息包含一批项目.收到并反序列化消息后,我们分解了批处理以进行处理.最终,批处理中的项目可能会被丢弃或提交到数据存储区,具体取决于其时间戳.

在这种情况下,确认如何工作?

google-cloud-dataflow

6
推荐指数
1
解决办法
1231
查看次数

Google Dataflow上Apache Beam示例的权限错误

我无法将Apache Beam示例从本地计算机提交到我们的云平台.

使用gcloud auth list我可以看到正确的帐户当前是活动的.我可以使用gsutil和Web客户端进行文件系统交互.我可以使用cloud shell通过python REPL运行管道.

但是当我尝试运行python wordcount示例时,我收到以下错误:

IOError: Could not upload to GCS path gs://my_bucket/tmp: access denied.
Please verify that credentials are valid and that you have write access 
to the specified path.
Run Code Online (Sandbox Code Playgroud)

关于凭证,我有什么遗漏吗?

python google-cloud-platform google-cloud-dataflow apache-beam

6
推荐指数
2
解决办法
2516
查看次数

apache_beam.transforms.util.Reshuffle()不适用于GCP数据流

我已经通过升级到了最新的apache_beam [gcp]软件包pip install --upgrade apache_beam[gcp]。但是,我注意到Reshuffle()没有出现在[gcp]发行版中。这是否意味着我将无法Reshuffle()在任何数据流管道中使用?有没有办法解决?还是pip包可能不是最新的,如果Reshuffle()在github上的master中,那么它将在dataflow上可用?

根据对这个问题的回答,我正在尝试从BigQuery读取数据,然后将数据随机化,然后再将其写入GCP存储桶中的CSV。我已经注意到,我用来训练GCMLE模型的.csv分片并不是真正随机的。在tensorflow中,我可以将批次随机化,但这只会对队列中建立的每个文件中的行进行随机化,而我的问题是当前正在生成的文件以某种方式存在偏差。如果对在数据流中写入CSV之前有其他洗牌的方法有任何建议,将不胜感激。

python google-cloud-platform google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
522
查看次数

SlidingWindows用于Apache Beam上的慢速数据(大间隔)

我正在使用Chicago Traffic Tracker数据集,每15分钟发布一次新数据.当新数据可用时,它表示距离"实时"10-15分钟的记录(例如,查找_last_updt).

例如,在00:20,我得到数据时间戳00:10; 在00:35,我从00:20开始; 在00:50,我从00:40开始.因此,我可以获得新数据"固定"(每15分钟)的时间间隔,尽管时间戳上的间隔略有变化.

我试图在Dataflow(Apache Beam)上使用这些数据,为此我正在使用Sliding Windows.我的想法是收集和处理4个连续的数据点(4 x 15min = 60min),并且一旦新的数据点可用,理想情况下更新我的和/平均值的计算.为此,我开始使用代码:

PCollection<TrafficData> trafficData = input        
    .apply("MapIntoSlidingWindows", Window.<TrafficData>into(
        SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
            .every(Duration.standardMinutes(15))) .     // interval to get new data
        .triggering(AfterWatermark
                        .pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()))
        .withAllowedLateness(Duration.ZERO)
        .accumulatingFiredPanes());
Run Code Online (Sandbox Code Playgroud)

不幸的是,看起来当我从输入中收到一个新的数据点时,我没有得到一个新的(更新的)结果GroupByKey.

我的SlidingWindows有问题吗?还是我错过了别的什么?

java sliding-window google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
503
查看次数

数据流中动态目标的问题

我有一个Dataflow作业,它从pubsub读取数据,并根据时间和文件名将内容写入GCS,其中文件夹路径基于YYYY / MM / DD。这允许根据日期在文件夹中生成文件,并使用apache beam的FileIODynamic Destinations

大约两周前,我注意到未确认消息的异常堆积。重新启动df作业后,错误消失了,新文件正在GCS中写入。

几天后,写入再次停止,除了这次,出现了一些错误,声称处理被卡住了。经过一些值得信赖的SO研究之后,我发现这可能由于2.90之前的Beam中的死锁问题引起的,因为它使用Conscrypt库作为默认的安全提供程序。因此,我从Beam 2.8升级到Beam 2.11。

再次起作用,直到没有起作用为止。我仔细查看了该错误,发现该错误与SimpleDateFormat对象有关,该对象不是线程安全的。因此,我切换为使用Java.time和DateTimeFormatter,这是线程安全的。它一直工作到没有。但是,这次,该错误稍有不同,并且没有指向我的代码中的任何内容:下面提供了该错误。

Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
  at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.MultitransformedIterator.hasNext(MultitransformedIterator.java:47)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:701)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
Run Code Online (Sandbox Code Playgroud)

在作业部署后约5小时开始出现此错误,并且随着时间的流逝,错误发生率呈上升趋势。24小时内书写速度显着下降。我有60名工人,我怀疑每次出现错误都会导致一名工人失败,最终导致工作中断。

在我的作者中,我对某些关键字的行进行了解析(可能不是最好的方法),以确定其所属的文件夹。然后,我将使用确定的文件名将文件插入GCS。这是我为我的作家使用的代码:

分区功能提供如下:

Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing …
Run Code Online (Sandbox Code Playgroud)

java google-cloud-storage google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
422
查看次数