Sci*_*zed 6 java google-cloud-storage google-cloud-dataflow apache-beam
我有一个Dataflow作业,它从pubsub读取数据,并根据时间和文件名将内容写入GCS,其中文件夹路径基于YYYY / MM / DD。这允许根据日期在文件夹中生成文件,并使用apache beam的FileIO和Dynamic Destinations。
大约两周前,我注意到未确认消息的异常堆积。重新启动df作业后,错误消失了,新文件正在GCS中写入。
几天后,写入再次停止,除了这次,出现了一些错误,声称处理被卡住了。经过一些值得信赖的SO研究之后,我发现这可能是由于2.90之前的Beam中的死锁问题引起的,因为它使用Conscrypt库作为默认的安全提供程序。因此,我从Beam 2.8升级到Beam 2.11。
再次起作用,直到没有起作用为止。我仔细查看了该错误,发现该错误与SimpleDateFormat对象有关,该对象不是线程安全的。因此,我切换为使用Java.time和DateTimeFormatter,这是线程安全的。它一直工作到没有。但是,这次,该错误稍有不同,并且没有指向我的代码中的任何内容:下面提供了该错误。
Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.MultitransformedIterator.hasNext(MultitransformedIterator.java:47)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:701)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
Run Code Online (Sandbox Code Playgroud)
在作业部署后约5小时开始出现此错误,并且随着时间的流逝,错误发生率呈上升趋势。24小时内书写速度显着下降。我有60名工人,我怀疑每次出现错误都会导致一名工人失败,最终导致工作中断。
在我的作者中,我对某些关键字的行进行了解析(可能不是最好的方法),以确定其所属的文件夹。然后,我将使用确定的文件名将文件插入GCS。这是我为我的作家使用的代码:
分区功能提供如下:
Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)
at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.MultitransformedIterator.hasNext(MultitransformedIterator.java:47)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:701)
at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
Run Code Online (Sandbox Code Playgroud)
可以在下面找到部署和运行管道的实际位置:
@SuppressWarnings("serial")
public static class datePartition implements SerializableFunction<String, String> {
private String filename;
public datePartition(String filename) {
this.filename = filename;
}
@Override
public String apply(String input) {
String folder_name = "NaN";
String date_dtf = "NaN";
String date_literal = "NaN";
try {
Matcher foldernames = Pattern.compile("\"foldername\":\"(.*?)\"").matcher(input);
if(foldernames.find()) {
folder_name = foldernames.group(1);
}
else {
Matcher folderid = Pattern.compile("\"folderid\":\"(.*?)\"").matcher(input);
if(folderid.find()) {
folder_name = folderid.group(1);
}
}
Matcher date_long = Pattern.compile("\"timestamp\":\"(.*?)\"").matcher(input);
if(date_long.find()) {
date_literal = date_long.group(1);
if(Utilities.isNumeric(date_literal)) {
LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.valueOf(date_literal)), ZoneId.systemDefault());
date_dtf = date.format(dtf);
}
else {
date_dtf = date_literal.split(":")[0].replace("-", "/").replace("T", "/");
}
}
return folder_name + "/" + date_dtf + "h/" + filename;
}
catch(Exception e) {
LOG.error("ERROR with either foldername or date");
LOG.error("Line : " + input);
LOG.error("folder : " + folder_name);
LOG.error("Date : " + date_dtf);
return folder_name + "/" + date_dtf + "h/" + filename;
}
}
}
Run Code Online (Sandbox Code Playgroud)
自从发布这个问题以来,我已经优化了数据流作业以避开瓶颈并增加并行化。就像 rsantiago 解释的那样,处理卡住不是错误,而只是数据流传达的一种方式,表明某个步骤比其他步骤花费的时间明显更长,这本质上是无法使用给定资源清除的瓶颈。我所做的改变似乎已经解决了这些问题。新代码如下:
public void streamData() {
try {
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(options.getWindowDuration() + " Window",
Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(parseDuration("24h")))
.apply(FileIO.<String,PubsubMessage>writeDynamic()
.by(new datePartition(options.getOutputFilenamePrefix()))
.via(Contextful.fn(
(SerializableFunction<PubsubMessage, String>) inputMsg -> new String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
TextIO.sink())
.withDestinationCoder(StringUtf8Coder.of())
.to(options.getOutputDirectory())
.withNaming(type -> new CrowdStrikeFileNaming(type))
.withNumShards(options.getNumShards())
.withTempDirectory(options.getTempLocation()));
pipeline.run();
}
catch(Exception e) {
LOG.error("Unable to deploy pipeline");
LOG.error(e.toString(), e);
}
}
Run Code Online (Sandbox Code Playgroud)
最大的变化涉及删除 extractMsg() 函数并将分区更改为仅使用元数据。这两个步骤都强制对消息进行反序列化/重新序列化,并严重影响性能。
此外,由于我的数据集是无界的,我必须设置非零数量的分片。我想简化我的文件命名策略,因此我将其设置为 1,但不知道它对性能有多大影响。从那时起,我在工作中找到了工作人员/分片/机器类型的良好平衡(不幸的是,主要基于猜测和检查)。
尽管在足够大的数据负载下仍然可能会出现瓶颈,但尽管负载很重(每天 3-5tb),管道仍表现良好。这些更改还显着改进了自动缩放,但我不确定为什么。数据流作业现在对峰值和谷值的反应更快。
| 归档时间: |
|
| 查看次数: |
422 次 |
| 最近记录: |