Sam*_*L-L 5 java google-cloud-dataflow apache-beam
我正在努力获取基于事件时间的触发器来触发我的apache光束管道,但是似乎确实能够触发具有处理时间的窗口触发。
我的管道相当基本:
我收到了一批数据点,其中包括从pubsub读取的毫秒级时间戳,时间戳比最早批处理的数据点稍早。批量处理数据可以减少客户端的工作量和发布费用。
我提取第二级时间戳并将时间戳应用于各个数据点
我对数据进行窗口处理,并避免使用全局窗口。
我将数据按秒进行分组,以供日后按流数据进行分类。
最终,我最终在分类的几秒钟上使用了滑动窗口,以每秒有条件地将两条消息之一发送到pubsub。
我的问题似乎在步骤3中。
我试图在第3阶段使用最终将在第5阶段使用的相同的窗口化策略,以对分类的秒数进行滑动平均值计算。
我已经尝试过使用withTimestampCombiner(TimestampCombiner.EARLIEST)选项,但这似乎无法解决。
我已经读过有关事件时间使用的.withEarlyFirings方法,但这似乎可以模仿我现有的解决方法。理想情况下,我将能够依靠水印通过窗口的结尾并包含延迟触发。
// De-Batching The Pubsub Message
static public class UnpackDataPoints extends DoFn<String,String>{
@ProcessElement
public void processElement(@Element String c, OutputReceiver<String> out) {
JsonArray packedData = new JsonParser().parse(c).getAsJsonArray();
DateTimeFormatter dtf = DateTimeFormat.forPattern("EEE dd MMM YYYY HH:mm:ss:SSS zzz");
for (JsonElement acDataPoint: packedData){
String hereData = acDataPoint.toString();
DateTime date = dtf.parseDateTime(acDataPoint.getAsJsonObject().get("Timestamp").getAsString());
Instant eventTimeStamp = date.toInstant();
out.outputWithTimestamp(hereData,eventTimeStamp);
}
}
}
Run Code Online (Sandbox Code Playgroud)
// Extracting The Second
static public class ExtractTimeStamp extends DoFn<String,KV<String,String>> {
@ProcessElement
public void processElement(ProcessContext ctx ,@Element String c, OutputReceiver<KV<String,String>> out) {
JsonObject accDataObject = new JsonParser().parse(c).getAsJsonObject();
String milliString = accDataObject.get("Timestamp").getAsString();
String secondString = StringUtils.left(milliString,24);
accDataObject.addProperty("noMiliTimeStamp", secondString);
String updatedAccData = accDataObject.toString();
KV<String,String> outputKV = KV.of(secondString,updatedAccData);
out.output(outputKV);
}
}
Run Code Online (Sandbox Code Playgroud)
// The Pipeline & Windowing
Pipeline pipeline = Pipeline.create(options);
PCollection<String> dataPoints = pipeline
.apply("Read from Pubsub", PubsubIO.readStrings()
.fromTopic("projects/????/topics/???")
.withTimestampAttribute("messageTimestamp"))
.apply("Extract Individual Data Points",ParDo.of(new UnpackDataPoints()));
/// This is the event time window that doesn't fire for some reason
/*
PCollection<String> windowedDataPoints = dataPoints.apply(
Window.<String>into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)))
// .triggering(AfterWatermark.pastEndOfWindow())
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TWO_MINUTES))
//.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2)))
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.standardSeconds(1)));
*/
///// Temporary Work Around, this does fire but data is out of order
PCollection<String> windowedDataPoints = dataPoints.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(120)))
.triggering(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5)))
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.standardSeconds(1)));
PCollection<KV<String, String>> TimeStamped = windowedDataPoints
.apply( "Pulling Out The Second For Aggregates", ParDo.of(new ExtractTimeStamp()));
PCollection<KV<String, Iterable<String>>> TimeStampedGrouped = TimeStamped.apply("Group By Key",GroupByKey.create());
PCollection<KV<String, Iterable<String>>> testing = TimeStampedGrouped.apply("testingIsh", ParDo.of(new LogKVIterable()));
Run Code Online (Sandbox Code Playgroud)
当我使用第一个被注释掉的窗口策略时,我的管道会无限期地运行,即接收数据并且LogKVIterable ParDo永远不会返回任何内容,而当我使用处理时间时,LogKVIterable会触发并记录到控制台。
这确实看起来像您添加到数据中的时间戳可能是错误/损坏的。我鼓励您验证以下内容:
元素中的时间戳已正确添加。在转换之前/之后添加一些日志记录,并广泛测试该代码。
管道中的数据新鲜度和系统滞后指标正在按您的预期进行。如果数据新鲜度未按预期变化,则强烈表明您的时间戳设置不正确。
| 归档时间: |
|
| 查看次数: |
99 次 |
| 最近记录: |