标签: google-cloud-dataflow

从Dataflow写入BigQuery - 作业完成后不会删除JSON文件

我们的一个Dataflow作业将其输出写入BigQuery.我对如何在幕后实现这一点的理解是,Dataflow实际上将结果(分片)以JSON格式写入GCS,然后启动BigQuery加载作业以导入该数据.

但是,我们注意到,在作业完成后,无论是成功还是失败,都不会删除某些JSON文件.错误消息中没有警告或建议不会删除文件.当我们注意到这一点时,我们看了一下我们的存储桶,它有几百个来自失败作业的大型JSON文件(主要是在开发期间).

我原以为Dataflow应该处理任何清理,即使作业失败,当它成功时,肯定会删除这些文件.在作业完成后留下这些文件会产生大量的存储成本!

这是一个错误吗?

作业的示例作业ID"成功"但在GCS中留下了数百个大文件:2015-05-27_18_21_21-8377993823053896089

在此输入图像描述

在此输入图像描述

在此输入图像描述

google-cloud-dataflow

6
推荐指数
2
解决办法
705
查看次数

通过Google Cloud Dataflow将PubSub消息插入BigQuery

我想使用Google Cloud Dataflow将来自主题的PubSub消息数据插入到BigQuery表中.一切都很好,但在BigQuery表中我可以看到像"߈ "这样难以理解的字符串.这是我的管道:

p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name"))
.apply(ParDo.named("Transformation").of(new StringToRowConverter()))
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table")
     .withSchema(schema)
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED))
Run Code Online (Sandbox Code Playgroud)

我的简单StringToRowConverter函数是:

class StringToRowConverter extends DoFn<String, TableRow> {
private static final long serialVersionUID = 0;

@Override
public void processElement(ProcessContext c) {
    for (String word : c.element().split(",")) {
      if (!word.isEmpty()) {
          System.out.println(word);
        c.output(new TableRow().set("data", word));
      }
    }
}
}
Run Code Online (Sandbox Code Playgroud)

这是我通过POST请求发送的消息:

POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish
{
 "messages": [
  {
   "attributes":{
"key": "tablet, smartphone, desktop",
"value": "eng"
   },
   "data": "34gf5ert"
  }
 ]
}
Run Code Online (Sandbox Code Playgroud)

我错过了什么?谢谢!

google-bigquery google-cloud-pubsub google-cloud-dataflow

6
推荐指数
1
解决办法
2477
查看次数

谷歌数据流管道中的数据存储输入是否可以一次处理一批N个条目?

我正在尝试执行数据流管道作业,该作业将从数据存储区一次N个条目执行一个函数.在我的情况下,此函数将一批100个条目作为有效负载发送到某些REST服务.这意味着我想要查看来自一个数据存储区实体的所有条目,并一次100个批处理条目发送到某些外部REST服务.

我目前的解决方案

  1. 从数据存储读取输入
  2. 创建与管道选项中指定的工作者一样多的键(1 worker = 1键).
  3. 按键分组,以便我们将迭代器作为输出(步骤4中的迭代器输入)
  4. 以编程方式批处理临时列表中的用户,并将它们作为批处理发送到REST端点.

上面描述的伪代码场景(忽略细节):

final int BATCH_SIZE = 100;

// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))

    // 2. create keys to be used in group by so we get iterator in next task
    .apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            String key = generateKey(c);
            EntryPOJO entry = processEntity(c);
            c.output(KV.of(key, entry));
        }
    }))

    // 3. Group by key
    .apply(GroupByKey.create())

    // …
Run Code Online (Sandbox Code Playgroud)

dataflow google-cloud-datastore gcloud google-cloud-dataflow

6
推荐指数
1
解决办法
1092
查看次数

从Google Dataflow访问在GKE中运行的HTTP服务

我在Google Container Engine集群上运行了一个HTTP服务(在kubernetes服务之后).

我的目标是使用固定名称从同一GCP项目上运行的Dataflow作业访问该服务(与使用DNS从GKE内部访问服务的方式相同).任何的想法?


  • 我在stackoverflow上阅读的大多数解决方案都依赖于在试图访问服务的机器上安装kube-proxy.据我所知,无法在Dataflow创建的每个工作实例上可靠地设置该服务.
  • 一种选择是创建外部平衡器并在公共DNS中创建A记录.虽然它有效,但我宁愿在我的公共DNS记录中没有指向该服务的条目.

google-kubernetes-engine google-cloud-dataflow

6
推荐指数
1
解决办法
1208
查看次数

Dataflow何时从PubSubIO确认批处理项目的消息?

关于这个主题存在一个问题,答案是"一旦消息在Dataflow管道中的某个地方持久存在,就会得到确认."

从概念上讲,这是有道理的,但我不确定Dataflow如何能够在消息被持久化之前在管道中进行反序列化和转换后跟踪消息.

在我们的示例中,PubSub消息包含一批项目.收到并反序列化消息后,我们分解了批处理以进行处理.最终,批处理中的项目可能会被丢弃或提交到数据存储区,具体取决于其时间戳.

在这种情况下,确认如何工作?

google-cloud-dataflow

6
推荐指数
1
解决办法
1231
查看次数

GCP数据流:从发布/订阅IO流式传输的系统延迟

我们使用“系统延迟”来检查我们的数据流作业的运行状况。例如,如果看到系统延迟增加,我们将尝试查看如何降低该指标。关于该指标几乎没有疑问。

  • 1)系统滞后到底意味着什么?

数据项等待处理的最长时间

以上是我们点击信息图标后在GCP控制台中看到的内容。在这种情况下,数据项是什么意思?流处理具有“窗口化”,事件时间与处理时间,水印等概念。什么时候考虑将某个项目等待处理?例如,仅仅是消息何时到达而不论其状态如何?

  • 2)此指标的最佳阈值是多少?

我们试图将这一指标保持在尽可能低的水平,但是对于将其保持在最低水平我们没有任何建议。例如,我们是否有一些建议,例如将系统延迟保持在20s到30s之间是最佳的。

  • 3)系统滞后如何影响汇

系统延迟如何影响事件本身的延迟?

streaming dataflow google-cloud-platform google-cloud-dataflow

6
推荐指数
1
解决办法
1743
查看次数

apache_beam.transforms.util.Reshuffle()不适用于GCP数据流

我已经通过升级到了最新的apache_beam [gcp]软件包pip install --upgrade apache_beam[gcp]。但是,我注意到Reshuffle()没有出现在[gcp]发行版中。这是否意味着我将无法Reshuffle()在任何数据流管道中使用?有没有办法解决?还是pip包可能不是最新的,如果Reshuffle()在github上的master中,那么它将在dataflow上可用?

根据对这个问题的回答,我正在尝试从BigQuery读取数据,然后将数据随机化,然后再将其写入GCP存储桶中的CSV。我已经注意到,我用来训练GCMLE模型的.csv分片并不是真正随机的。在tensorflow中,我可以将批次随机化,但这只会对队列中建立的每个文件中的行进行随机化,而我的问题是当前正在生成的文件以某种方式存在偏差。如果对在数据流中写入CSV之前有其他洗牌的方法有任何建议,将不胜感激。

python google-cloud-platform google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
522
查看次数

限制光束应用的一步

我在 google 数据流上使用 python beam,我的管道如下所示:

从文件中读取图像 url >> 下载图像 >> 处理图像

问题是我不能让下载图像按需要进行缩放,因为我的应用程序可能会被图像服务器阻止。

这是一种可以节流步骤的方法吗?每分钟输入或输出。

谢谢你。

python dataflow google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
1773
查看次数

使用Dataflow进行图像预处理

任务:我将运行一个ETL作业,该作业将从GCS中提取TIFF图像,使用OpenCV + Tesseract等开源计算机视觉工具的组合将这些图像转换为文本,并最终将数据加载到BigQuery中

问题:我正在尝试使用Dataflow来执行ETL作业,因为我有数百万个图像(每个图像是一个单独的文件/ blob),我想扩展到数百台机器.但是,我遇到了关于下载图像的最佳方法的Dataflow(将在下面更详细地描述)的一些问题.

问题:最终我试图确定:

1)数据流是否是执行此操作的最佳解决方案?我考虑过的替代方案是在大型机器上运行多线程作业.我还应该考虑其他替代方案吗?

2)如果数据流是最佳解决方案,那么我应该如何专门处理下载数百万张图像(以便我可以通过转换运行它们)?

技术挑战:

以下帖子推荐的解决方案建议beam.io.gcp.gcsio.GcsIO().open(filepath, 'r')在DoFn中使用从GCS下载图像.
我试图沿着这条路走下去,beam.io.gcp.gcsio.GcsIO().open(filepath, 'r')但是,我无法打开图像.这个问题在这里描述:IO.BufferReader问题.

使用时,DirectRunner我可以使用此客户端api下载图像文件from google.cloud import storage,我可以打开并预处理图像没问题.但是,在使用数据流运行器时,我遇到了依赖性问题AttributeError: 'module' object has no attribute 'storage'.

话虽如此,如果Dataflow是最佳解决方案,那么下载和处理数百万张图像的最佳方法是什么?

python image-processing google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
584
查看次数

数据流中动态目标的问题

我有一个Dataflow作业,它从pubsub读取数据,并根据时间和文件名将内容写入GCS,其中文件夹路径基于YYYY / MM / DD。这允许根据日期在文件夹中生成文件,并使用apache beam的FileIODynamic Destinations

大约两周前,我注意到未确认消息的异常堆积。重新启动df作业后,错误消失了,新文件正在GCS中写入。

几天后,写入再次停止,除了这次,出现了一些错误,声称处理被卡住了。经过一些值得信赖的SO研究之后,我发现这可能由于2.90之前的Beam中的死锁问题引起的,因为它使用Conscrypt库作为默认的安全提供程序。因此,我从Beam 2.8升级到Beam 2.11。

再次起作用,直到没有起作用为止。我仔细查看了该错误,发现该错误与SimpleDateFormat对象有关,该对象不是线程安全的。因此,我切换为使用Java.time和DateTimeFormatter,这是线程安全的。它一直工作到没有。但是,这次,该错误稍有不同,并且没有指向我的代码中的任何内容:下面提供了该错误。

Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
  at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.MultitransformedIterator.hasNext(MultitransformedIterator.java:47)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:701)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
Run Code Online (Sandbox Code Playgroud)

在作业部署后约5小时开始出现此错误,并且随着时间的流逝,错误发生率呈上升趋势。24小时内书写速度显着下降。我有60名工人,我怀疑每次出现错误都会导致一名工人失败,最终导致工作中断。

在我的作者中,我对某些关键字的行进行了解析(可能不是最好的方法),以确定其所属的文件夹。然后,我将使用确定的文件名将文件插入GCS。这是我为我的作家使用的代码:

分区功能提供如下:

Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing …
Run Code Online (Sandbox Code Playgroud)

java google-cloud-storage google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
422
查看次数