Apache Beam 根据前一行的值更新值
我已将 CSV 文件中的值分组。在分组行中,我们发现一些缺失值,需要根据前一行的值进行更新。如果该行的第一列为空,那么我们需要将其更新为 0。
我能够对记录进行分组,但无法找出更新值的逻辑,我该如何实现这一点?
记录
| 客户ID | 日期 | 数量 |
|---|---|---|
| BS:89481 | 2012年1月1日 | 100 |
| BS:89482 | 2012年1月1日 | |
| BS:89483 | 2012年1月1日 | 300 |
| BS:89481 | 2012年1月2日 | 900 |
| BS:89482 | 2012年1月2日 | 200 |
| BS:89483 | 2012年1月2日 |
分组记录
| 客户ID | 日期 | 数量 |
|---|---|---|
| BS:89481 | 2012年1月1日 | 100 |
| BS:89481 | 2012年1月2日 | 900 |
| BS:89482 | 2012年1月1日 | |
| BS:89482 | 2012年1月2日 | 200 |
| BS:89483 | 2012年1月1日 | 300 |
| BS:89483 | 2012年1月2日 |
更新缺失值
| 客户ID | 日期 | 数量 |
|---|---|---|
| BS:89481 | 2012年1月1日 | 100 |
| BS:89481 | 2012年1月2日 | 900 |
| BS:89482 | 2012年1月1日 | 000 |
| BS:89482 | 2012年1月2日 | 200 |
| BS:89483 | 2012年1月1日 | 300 |
| BS:89483 | 2012年1月2日 | 300 |
到目前为止的代码:
public class GroupByTest {
public static void main(String[] args) …Run Code Online (Sandbox Code Playgroud) 窗格和窗户有什么区别?传入的元素被分组到窗口中。那么窗格包含什么呢?
我从 Beam 文档中获取了以下代码
.of(new DoFn<String, String>() {
public void processElement(@Element String word, PaneInfo paneInfo) {
}})
Run Code Online (Sandbox Code Playgroud)
每个元素都属于一个窗格吗?还是多个窗格?需要一个简单的类比来理解窗格和窗口
在 GCP Dataflow 上运行的 Apache Beam Python SDK 中,我需要DoFn.process很长时间。我的 DoFn 花了很长时间,原因并不那么重要 - 由于我无法控制的要求,我必须接受它们。但如果您必须知道的话,它对外部服务进行网络调用需要相当长的时间(几秒钟),并且它正在处理先前的多个元素GroupByKey- 导致DoFn.process调用需要几分钟的时间。
无论如何,我的问题是:通话的运行时间长度是否有时间限制DoFn.process?我这么问是因为我看到的日志如下所示:
WARNING 2023-01-03T13:12:12.679957Z ReportProgress() took long: 1m49.15726646s
WARNING 2023-01-03T13:12:14.474585Z ReportProgress() took long: 1m7.166061638s
WARNING 2023-01-03T13:12:14.864634Z ReportProgress() took long: 1m58.479671042s
WARNING 2023-01-03T13:12:16.967743Z ReportProgress() took long: 1m40.379289919s
2023-01-03 08:16:47.888 EST Error message from worker: SDK harness sdk-0-6 disconnected.
2023-01-03 08:21:25.826 EST Error message from worker: SDK harness sdk-0-2 disconnected.
2023-01-03 08:21:36.011 EST Error message from worker: SDK harness sdk-0-4 disconnected. …Run Code Online (Sandbox Code Playgroud) python timeout google-cloud-dataflow apache-beam apache-beam-internals