我们的一个Dataflow作业将其输出写入BigQuery.我对如何在幕后实现这一点的理解是,Dataflow实际上将结果(分片)以JSON格式写入GCS,然后启动BigQuery加载作业以导入该数据.
但是,我们注意到,在作业完成后,无论是成功还是失败,都不会删除某些JSON文件.错误消息中没有警告或建议不会删除文件.当我们注意到这一点时,我们看了一下我们的存储桶,它有几百个来自失败作业的大型JSON文件(主要是在开发期间).
我原以为Dataflow应该处理任何清理,即使作业失败,当它成功时,肯定会删除这些文件.在作业完成后留下这些文件会产生大量的存储成本!
这是一个错误吗?
作业的示例作业ID"成功"但在GCS中留下了数百个大文件:2015-05-27_18_21_21-8377993823053896089



我想使用Google Cloud Dataflow将来自主题的PubSub消息数据插入到BigQuery表中.一切都很好,但在BigQuery表中我可以看到像"߈ "这样难以理解的字符串.这是我的管道:
p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name"))
.apply(ParDo.named("Transformation").of(new StringToRowConverter()))
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table")
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED))
Run Code Online (Sandbox Code Playgroud)
我的简单StringToRowConverter函数是:
class StringToRowConverter extends DoFn<String, TableRow> {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split(",")) {
if (!word.isEmpty()) {
System.out.println(word);
c.output(new TableRow().set("data", word));
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
这是我通过POST请求发送的消息:
POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish
{
"messages": [
{
"attributes":{
"key": "tablet, smartphone, desktop",
"value": "eng"
},
"data": "34gf5ert"
}
]
}
Run Code Online (Sandbox Code Playgroud)
我错过了什么?谢谢!
我正在尝试执行数据流管道作业,该作业将从数据存储区一次对N个条目执行一个函数.在我的情况下,此函数将一批100个条目作为有效负载发送到某些REST服务.这意味着我想要查看来自一个数据存储区实体的所有条目,并一次将100个批处理条目发送到某些外部REST服务.
我目前的解决方案
上面描述的伪代码场景(忽略细节):
final int BATCH_SIZE = 100;
// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))
// 2. create keys to be used in group by so we get iterator in next task
.apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String key = generateKey(c);
EntryPOJO entry = processEntity(c);
c.output(KV.of(key, entry));
}
}))
// 3. Group by key
.apply(GroupByKey.create())
// …Run Code Online (Sandbox Code Playgroud) dataflow google-cloud-datastore gcloud google-cloud-dataflow
我在Google Container Engine集群上运行了一个HTTP服务(在kubernetes服务之后).
我的目标是使用固定名称从同一GCP项目上运行的Dataflow作业访问该服务(与使用DNS从GKE内部访问服务的方式相同).任何的想法?
我用python sdk编写自定义接收器。我尝试将数据存储到AWS S3。要连接S3,需要一些凭据,秘密密钥,但是出于安全原因,在代码中进行设置不是很好。我想使环境变量作为环境变量到达Dataflow工作人员。我该怎么做?
我无法将Apache Beam示例从本地计算机提交到我们的云平台.
使用gcloud auth list我可以看到正确的帐户当前是活动的.我可以使用gsutil和Web客户端进行文件系统交互.我可以使用cloud shell通过python REPL运行管道.
但是当我尝试运行python wordcount示例时,我收到以下错误:
IOError: Could not upload to GCS path gs://my_bucket/tmp: access denied.
Please verify that credentials are valid and that you have write access
to the specified path.
Run Code Online (Sandbox Code Playgroud)
关于凭证,我有什么遗漏吗?
python google-cloud-platform google-cloud-dataflow apache-beam
我已经通过升级到了最新的apache_beam [gcp]软件包pip install --upgrade apache_beam[gcp]。但是,我注意到Reshuffle()没有出现在[gcp]发行版中。这是否意味着我将无法Reshuffle()在任何数据流管道中使用?有没有办法解决?还是pip包可能不是最新的,如果Reshuffle()在github上的master中,那么它将在dataflow上可用?
根据对这个问题的回答,我正在尝试从BigQuery读取数据,然后将数据随机化,然后再将其写入GCP存储桶中的CSV。我已经注意到,我用来训练GCMLE模型的.csv分片并不是真正随机的。在tensorflow中,我可以将批次随机化,但这只会对队列中建立的每个文件中的行进行随机化,而我的问题是当前正在生成的文件以某种方式存在偏差。如果对在数据流中写入CSV之前有其他洗牌的方法有任何建议,将不胜感激。
python google-cloud-platform google-cloud-dataflow apache-beam
我正在使用Chicago Traffic Tracker数据集,每15分钟发布一次新数据.当新数据可用时,它表示距离"实时"10-15分钟的记录(例如,查找_last_updt).
例如,在00:20,我得到数据时间戳00:10; 在00:35,我从00:20开始; 在00:50,我从00:40开始.因此,我可以获得新数据"固定"(每15分钟)的时间间隔,尽管时间戳上的间隔略有变化.
我试图在Dataflow(Apache Beam)上使用这些数据,为此我正在使用Sliding Windows.我的想法是收集和处理4个连续的数据点(4 x 15min = 60min),并且一旦新的数据点可用,理想情况下更新我的和/平均值的计算.为此,我开始使用代码:
PCollection<TrafficData> trafficData = input
.apply("MapIntoSlidingWindows", Window.<TrafficData>into(
SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
.every(Duration.standardMinutes(15))) . // interval to get new data
.triggering(AfterWatermark
.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
Run Code Online (Sandbox Code Playgroud)
不幸的是,看起来当我从输入中收到一个新的数据点时,我没有得到一个新的(更新的)结果GroupByKey.
我的SlidingWindows有问题吗?还是我错过了别的什么?
我有一个Dataflow作业,它从pubsub读取数据,并根据时间和文件名将内容写入GCS,其中文件夹路径基于YYYY / MM / DD。这允许根据日期在文件夹中生成文件,并使用apache beam的FileIO和Dynamic Destinations。
大约两周前,我注意到未确认消息的异常堆积。重新启动df作业后,错误消失了,新文件正在GCS中写入。
几天后,写入再次停止,除了这次,出现了一些错误,声称处理被卡住了。经过一些值得信赖的SO研究之后,我发现这可能是由于2.90之前的Beam中的死锁问题引起的,因为它使用Conscrypt库作为默认的安全提供程序。因此,我从Beam 2.8升级到Beam 2.11。
再次起作用,直到没有起作用为止。我仔细查看了该错误,发现该错误与SimpleDateFormat对象有关,该对象不是线程安全的。因此,我切换为使用Java.time和DateTimeFormatter,这是线程安全的。它一直工作到没有。但是,这次,该错误稍有不同,并且没有指向我的代码中的任何内容:下面提供了该错误。
Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.MultitransformedIterator.hasNext(MultitransformedIterator.java:47)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:701)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
Run Code Online (Sandbox Code Playgroud)
在作业部署后约5小时开始出现此错误,并且随着时间的流逝,错误发生率呈上升趋势。24小时内书写速度显着下降。我有60名工人,我怀疑每次出现错误都会导致一名工人失败,最终导致工作中断。
在我的作者中,我对某些关键字的行进行了解析(可能不是最好的方法),以确定其所属的文件夹。然后,我将使用确定的文件名将文件插入GCS。这是我为我的作家使用的代码:
分区功能提供如下:
Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing …Run Code Online (Sandbox Code Playgroud)