标签: apache-beam-internals

Apache Beam 根据上一行的值更新当前行值

Apache Beam 根据前一行的值更新值

我已将 CSV 文件中的值分组。在分组行中,我们发现一些缺失值,需要根据前一行的值进行更新。如果该行的第一列为空,那么我们需要将其更新为 0。

我能够对记录进行分组,但无法找出更新值的逻辑,我该如何实现这一点?

记录

客户ID 日期 数量
BS:89481 2012年1月1日 100
BS:89482 2012年1月1日
BS:89483 2012年1月1日 300
BS:89481 2012年1月2日 900
BS:89482 2012年1月2日 200
BS:89483 2012年1月2日

分组记录

客户ID 日期 数量
BS:89481 2012年1月1日 100
BS:89481 2012年1月2日 900
BS:89482 2012年1月1日
BS:89482 2012年1月2日 200
BS:89483 2012年1月1日 300
BS:89483 2012年1月2日

更新缺失值

客户ID 日期 数量
BS:89481 2012年1月1日 100
BS:89481 2012年1月2日 900
BS:89482 2012年1月1日 000
BS:89482 2012年1月2日 200
BS:89483 2012年1月1日 300
BS:89483 2012年1月2日 300

到目前为止的代码:

public class GroupByTest {
    public static void main(String[] args) …
Run Code Online (Sandbox Code Playgroud)

java apache-beam apache-beam-io apache-beam-internals

2
推荐指数
1
解决办法
1193
查看次数

apache beam 窗格和窗口之间的区别

窗格和窗户有什么区别?传入的元素被分组到窗口中。那么窗格包含什么呢?

我从 Beam 文档中获取了以下代码

.of(new DoFn<String, String>() {
     public void processElement(@Element String word, PaneInfo paneInfo) {
  }})
Run Code Online (Sandbox Code Playgroud)

每个元素都属于一个窗格吗?还是多个窗格?需要一个简单的类比来理解窗格和窗口

google-cloud-dataflow apache-beam apache-beam-internals

1
推荐指数
1
解决办法
1015
查看次数

在 GCP Dataflow/Apache Beam Python SDK 中,DoFn.process 有时间限制吗?

在 GCP Dataflow 上运行的 Apache Beam Python SDK 中,我需要DoFn.process很长时间。我的 DoFn 花了很长时间,原因并不那么重要 - 由于我无法控制的要求,我必须接受它们。但如果您必须知道的话,它对外部服务进行网络调用需要相当长的时间(几秒钟),并且它正在处理先前的多个元素GroupByKey- 导致DoFn.process调用需要几分钟的时间。

无论如何,我的问题是:通话的运行时间长度是否有时间限制DoFn.process?我这么问是因为我看到的日志如下所示:

WARNING 2023-01-03T13:12:12.679957Z ReportProgress() took long: 1m49.15726646s
WARNING 2023-01-03T13:12:14.474585Z ReportProgress() took long: 1m7.166061638s
WARNING 2023-01-03T13:12:14.864634Z ReportProgress() took long: 1m58.479671042s
WARNING 2023-01-03T13:12:16.967743Z ReportProgress() took long: 1m40.379289919s
2023-01-03 08:16:47.888 EST Error message from worker: SDK harness sdk-0-6 disconnected.
2023-01-03 08:21:25.826 EST Error message from worker: SDK harness sdk-0-2 disconnected.
2023-01-03 08:21:36.011 EST Error message from worker: SDK harness sdk-0-4 disconnected. …
Run Code Online (Sandbox Code Playgroud)

python timeout google-cloud-dataflow apache-beam apache-beam-internals

0
推荐指数
1
解决办法
653
查看次数