标签: spotify-scio

数据流如何触发AfterProcessingTime.pastFirstElementInPane()工作?

在数据流流世界中。

我说的时候的理解:

Window.into(FixedWindows.of(Duration.standardHours(1)))
  .triggering(AfterProcessingTime.pastFirstElementInPane()
      .plusDelayOf(Duration.standardMinutes(15))
Run Code Online (Sandbox Code Playgroud)

就是对于一个小时的固定窗口,触发器在看到第一个元素之后将等待或批处理这些元素。

但是当我说:

Window.into(FixedWindows.of(Duration.standardHours(1)))
  .triggering(AfterProcessingTime.pastFirstElementInPane()
Run Code Online (Sandbox Code Playgroud)

它是从第一次看到第一个元素起就每次触发,还是隐式地对元素进行批处理?因为在每个元素上触发都会使系统过载。

google-cloud-dataflow apache-beam spotify-scio

5
推荐指数
1
解决办法
978
查看次数

带窗口的 GroupByKey 后,Beam 管道不产生任何输出,并且出现内存错误

目的:

我想加载流数据,然后添加一个键,然后按键计数。

问题:

当我尝试使用流方法(无界数据)加载和按键分组大数据时,Apache Beam Dataflow 管道出现内存错误。因为似乎数据是按分组累积的,并且不会在触发每个窗口时更早地触发数据。

如果我减小元素大小(元素数量不会改变),它会起作用!因为实际上 group-by step 等待所有数据被分组,然后触发所有新的窗口数据。

我对两者进行了测试:

梁版本 2.11.0 和 scio 版本 0.7.4

梁版本 2.6.0 和 scio 版本 0.6.1

重新生成错误的方法:

  1. 阅读包含文件名的 Pubsub 消息
  2. 从 GCS 读取并加载相关文件作为逐行迭代器
  3. 逐行展平(因此它生成大约 10,000 个)元素
  4. 向元素添加时间戳(当前即时时间)
  5. 创建我的数据的键值(使用一些从 1 到 10 的随机整数键)
  6. Apply window with triggering(在行很小且没有内存问题的情况下会触发大约50次)
  7. 每个键计数(按键分组然后组合它们)
  8. 最后,我们应该有大约 50 * 10 个元素代表按窗口和键的计数(当行大小足够小时测试成功)

管道的可视化(步骤 4 到 7):

在此处输入图片说明

group-by-key 步骤的总结:

在此处输入图片说明

如您所见,数据是按组累积的,不会发出。

窗口代码在这里:

val windowedData = data.applyKvTransform(
  Window.into[myt](
    Sessions.withGapDuration(Duration.millis(1)))
    .triggering(
      Repeatedly.forever(AfterFirst.of(
        AfterPane.elementCountAtLeast(10),
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))

      ).orFinally(AfterWatermark.pastEndOfWindow())

    ).withAllowedLateness(Duration.standardSeconds(100))
    .discardingFiredPanes()

)
Run Code Online (Sandbox Code Playgroud)

错误:

org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and …
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-beam spotify-scio

3
推荐指数
1
解决办法
1840
查看次数

为什么在 Scio 中你更喜欢聚合而不是 groupByKey?

从:

https://github.com/spotify/scio/wiki/Scio-data-guideline

“比 groupByKey 更喜欢组合/聚合/减少转换。请记住,减少操作必须是关联的和可交换的。”

为什么特别喜欢聚合而不是 groupByKey?

scala dataflow apache-beam spotify-scio

1
推荐指数
1
解决办法
1261
查看次数

在Google Cloud Dataflow中按顺序读取文件

我正在使用Spotify Scio来读取从Stackdriver导出到Google云端存储的日志.它们是JSON文件,其中每一行都是单个条目.查看工作日志,似乎文件被拆分为块,然后以任何顺序读取.在这种情况下,我已经将我的工作限制在1名工人身上.有没有办法强制按顺序读取和处理这些块?

作为一个例子(textFile基本上是一个TextIO.Read):

val sc = ScioContext(myOptions)
sc.textFile(myFile).map(line => logger.info(line))
Run Code Online (Sandbox Code Playgroud)

将根据工作日志生成类似于此的输出:

line 5
line 6
line 7
line 8
<Some other work>
line 1
line 2
line 3
line 4
<Some other work>
line 9
line 10
line 11
line 12
Run Code Online (Sandbox Code Playgroud)

我想知道的是,是否有办法迫使它按顺序读取1-12行.我发现gzipping文件并使用指定的CompressionType读取它是一种解决方法,但我想知道是否有任何方法可以执行此操作,不涉及压缩或更改原始文件.

google-cloud-platform google-cloud-dataflow spotify-scio

0
推荐指数
1
解决办法
597
查看次数