我们的一个Dataflow作业将其输出写入BigQuery.我对如何在幕后实现这一点的理解是,Dataflow实际上将结果(分片)以JSON格式写入GCS,然后启动BigQuery加载作业以导入该数据.
但是,我们注意到,在作业完成后,无论是成功还是失败,都不会删除某些JSON文件.错误消息中没有警告或建议不会删除文件.当我们注意到这一点时,我们看了一下我们的存储桶,它有几百个来自失败作业的大型JSON文件(主要是在开发期间).
我原以为Dataflow应该处理任何清理,即使作业失败,当它成功时,肯定会删除这些文件.在作业完成后留下这些文件会产生大量的存储成本!
这是一个错误吗?
作业的示例作业ID"成功"但在GCS中留下了数百个大文件:2015-05-27_18_21_21-8377993823053896089



我想使用Google Cloud Dataflow将来自主题的PubSub消息数据插入到BigQuery表中.一切都很好,但在BigQuery表中我可以看到像"߈ "这样难以理解的字符串.这是我的管道:
p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name"))
.apply(ParDo.named("Transformation").of(new StringToRowConverter()))
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table")
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED))
Run Code Online (Sandbox Code Playgroud)
我的简单StringToRowConverter函数是:
class StringToRowConverter extends DoFn<String, TableRow> {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split(",")) {
if (!word.isEmpty()) {
System.out.println(word);
c.output(new TableRow().set("data", word));
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
这是我通过POST请求发送的消息:
POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish
{
"messages": [
{
"attributes":{
"key": "tablet, smartphone, desktop",
"value": "eng"
},
"data": "34gf5ert"
}
]
}
Run Code Online (Sandbox Code Playgroud)
我错过了什么?谢谢!
我正在尝试执行数据流管道作业,该作业将从数据存储区一次对N个条目执行一个函数.在我的情况下,此函数将一批100个条目作为有效负载发送到某些REST服务.这意味着我想要查看来自一个数据存储区实体的所有条目,并一次将100个批处理条目发送到某些外部REST服务.
我目前的解决方案
上面描述的伪代码场景(忽略细节):
final int BATCH_SIZE = 100;
// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))
// 2. create keys to be used in group by so we get iterator in next task
.apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String key = generateKey(c);
EntryPOJO entry = processEntity(c);
c.output(KV.of(key, entry));
}
}))
// 3. Group by key
.apply(GroupByKey.create())
// …Run Code Online (Sandbox Code Playgroud) dataflow google-cloud-datastore gcloud google-cloud-dataflow
我在Google Container Engine集群上运行了一个HTTP服务(在kubernetes服务之后).
我的目标是使用固定名称从同一GCP项目上运行的Dataflow作业访问该服务(与使用DNS从GKE内部访问服务的方式相同).任何的想法?
我们使用“系统延迟”来检查我们的数据流作业的运行状况。例如,如果看到系统延迟增加,我们将尝试查看如何降低该指标。关于该指标几乎没有疑问。
数据项等待处理的最长时间
以上是我们点击信息图标后在GCP控制台中看到的内容。在这种情况下,数据项是什么意思?流处理具有“窗口化”,事件时间与处理时间,水印等概念。什么时候考虑将某个项目等待处理?例如,仅仅是消息何时到达而不论其状态如何?
我们试图将这一指标保持在尽可能低的水平,但是对于将其保持在最低水平我们没有任何建议。例如,我们是否有一些建议,例如将系统延迟保持在20s到30s之间是最佳的。
系统延迟如何影响事件本身的延迟?
streaming dataflow google-cloud-platform google-cloud-dataflow
我已经通过升级到了最新的apache_beam [gcp]软件包pip install --upgrade apache_beam[gcp]。但是,我注意到Reshuffle()没有出现在[gcp]发行版中。这是否意味着我将无法Reshuffle()在任何数据流管道中使用?有没有办法解决?还是pip包可能不是最新的,如果Reshuffle()在github上的master中,那么它将在dataflow上可用?
根据对这个问题的回答,我正在尝试从BigQuery读取数据,然后将数据随机化,然后再将其写入GCP存储桶中的CSV。我已经注意到,我用来训练GCMLE模型的.csv分片并不是真正随机的。在tensorflow中,我可以将批次随机化,但这只会对队列中建立的每个文件中的行进行随机化,而我的问题是当前正在生成的文件以某种方式存在偏差。如果对在数据流中写入CSV之前有其他洗牌的方法有任何建议,将不胜感激。
python google-cloud-platform google-cloud-dataflow apache-beam
我在 google 数据流上使用 python beam,我的管道如下所示:
从文件中读取图像 url >> 下载图像 >> 处理图像
问题是我不能让下载图像按需要进行缩放,因为我的应用程序可能会被图像服务器阻止。
这是一种可以节流步骤的方法吗?每分钟输入或输出。
谢谢你。
任务:我将运行一个ETL作业,该作业将从GCS中提取TIFF图像,使用OpenCV + Tesseract等开源计算机视觉工具的组合将这些图像转换为文本,并最终将数据加载到BigQuery中
问题:我正在尝试使用Dataflow来执行ETL作业,因为我有数百万个图像(每个图像是一个单独的文件/ blob),我想扩展到数百台机器.但是,我遇到了关于下载图像的最佳方法的Dataflow(将在下面更详细地描述)的一些问题.
问题:最终我试图确定:
1)数据流是否是执行此操作的最佳解决方案?我考虑过的替代方案是在大型机器上运行多线程作业.我还应该考虑其他替代方案吗?
2)如果数据流是最佳解决方案,那么我应该如何专门处理下载数百万张图像(以便我可以通过转换运行它们)?
技术挑战:
以下帖子推荐的解决方案建议beam.io.gcp.gcsio.GcsIO().open(filepath, 'r')在DoFn中使用从GCS下载图像.
我试图沿着这条路走下去,beam.io.gcp.gcsio.GcsIO().open(filepath, 'r')但是,我无法打开图像.这个问题在这里描述:IO.BufferReader问题.
使用时,DirectRunner我可以使用此客户端api下载图像文件from google.cloud import storage,我可以打开并预处理图像没问题.但是,在使用数据流运行器时,我遇到了依赖性问题AttributeError: 'module' object has no attribute 'storage'.
话虽如此,如果Dataflow是最佳解决方案,那么下载和处理数百万张图像的最佳方法是什么?
我有一个Dataflow作业,它从pubsub读取数据,并根据时间和文件名将内容写入GCS,其中文件夹路径基于YYYY / MM / DD。这允许根据日期在文件夹中生成文件,并使用apache beam的FileIO和Dynamic Destinations。
大约两周前,我注意到未确认消息的异常堆积。重新启动df作业后,错误消失了,新文件正在GCS中写入。
几天后,写入再次停止,除了这次,出现了一些错误,声称处理被卡住了。经过一些值得信赖的SO研究之后,我发现这可能是由于2.90之前的Beam中的死锁问题引起的,因为它使用Conscrypt库作为默认的安全提供程序。因此,我从Beam 2.8升级到Beam 2.11。
再次起作用,直到没有起作用为止。我仔细查看了该错误,发现该错误与SimpleDateFormat对象有关,该对象不是线程安全的。因此,我切换为使用Java.time和DateTimeFormatter,这是线程安全的。它一直工作到没有。但是,这次,该错误稍有不同,并且没有指向我的代码中的任何内容:下面提供了该错误。
Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.MultitransformedIterator.hasNext(MultitransformedIterator.java:47)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:701)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
Run Code Online (Sandbox Code Playgroud)
在作业部署后约5小时开始出现此错误,并且随着时间的流逝,错误发生率呈上升趋势。24小时内书写速度显着下降。我有60名工人,我怀疑每次出现错误都会导致一名工人失败,最终导致工作中断。
在我的作者中,我对某些关键字的行进行了解析(可能不是最好的方法),以确定其所属的文件夹。然后,我将使用确定的文件名将文件插入GCS。这是我为我的作家使用的代码:
分区功能提供如下:
Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing …Run Code Online (Sandbox Code Playgroud)