Mel*_*man 1 google-cloud-dataflow
我目前正在通过从gcs存储桶中读取过滤信息并将其作为侧输入传递到我的管道的不同阶段来创建PCollectionView,以便过滤输出.如果gcs存储桶中的文件发生更改,我希望当前运行的管道使用此新的过滤器信息.如果我的过滤器发生变化,有没有办法在每个新的数据窗口更新这个PCollectionView?我以为我可以在startBundle中做到这一点,但我无法弄清楚它是如何或是否可能.如果有可能,你能举个例子吗?
PCollectionView<Map<String, TagObject>>
tagMapView =
pipeline.apply(TextIO.Read.named("TagListTextRead")
.from("gs://tag-list-bucket/tag-list.json"))
.apply(ParDo.named("TagsToTagMap").of(new Tags.BuildTagListMapFn()))
.apply("MakeTagMapView", View.asSingleton());
PCollection<String>
windowedData =
pipeline.apply(PubsubIO.Read.topic("myTopic"))
.apply(Window.<String>into(
SlidingWindows.of(Duration.standardMinutes(15))
.every(Duration.standardSeconds(31))));
PCollection<MY_DATA>
lineData = windowedData
.apply(ParDo.named("ExtractJsonObject")
.withSideInputs(tagMapView)
.of(new ExtractJsonObjectFn()));
Run Code Online (Sandbox Code Playgroud)
你可能想要"最多使用一个1分钟左右的过滤器版本作为侧面输入"(因为理论上文件可能经常变化,不可预测,并且独立于管道 - 所以没有办法真正完全将文件的更改与管道的行为同步).
这是我能够提出的一个(授予的,相当笨拙的)解决方案.它依赖于侧输入也被窗口隐式地键入的事实.在这个解决方案中,我们将创建一个窗口为1分钟固定窗口的侧面输入,其中每个窗口将包含标记映射的单个值,从该过滤器文件中导出该窗口内的某个时刻.
PCollection<Long> ticks = p
// Produce 1 "tick" per second
.apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(1)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
// Use an arbitrary per-window combiner to reduce to 1 element per window
.apply(Count.globally());
// Produce a collection of tag maps, 1 per each 1-minute window
PCollectionView<TagMap> tagMapView = ticks
.apply(MapElements.via((Long ignored) -> {
... manually read the json file as a TagMap ...
}))
.apply(View.asSingleton());
Run Code Online (Sandbox Code Playgroud)
这种模式(加入缓慢变化的外部数据作为副输入)正在反复出现,我在这里提出的解决方案远非完美,我希望我们在编程模型中能够更好地支持这一点.我提交了一个BEAM JIRA问题来跟踪这个问题.
| 归档时间: |
|
| 查看次数: |
865 次 |
| 最近记录: |