在数据流流世界中。
我说的时候的理解:
Window.into(FixedWindows.of(Duration.standardHours(1)))
.triggering(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(15))
Run Code Online (Sandbox Code Playgroud)
就是对于一个小时的固定窗口,触发器在看到第一个元素之后将等待或批处理这些元素。
但是当我说:
Window.into(FixedWindows.of(Duration.standardHours(1)))
.triggering(AfterProcessingTime.pastFirstElementInPane()
Run Code Online (Sandbox Code Playgroud)
它是从第一次看到第一个元素起就每次触发,还是隐式地对元素进行批处理?因为在每个元素上触发都会使系统过载。
我想加载流数据,然后添加一个键,然后按键计数。
当我尝试使用流方法(无界数据)加载和按键分组大数据时,Apache Beam Dataflow 管道出现内存错误。因为似乎数据是按分组累积的,并且不会在触发每个窗口时更早地触发数据。
如果我减小元素大小(元素数量不会改变),它会起作用!因为实际上 group-by step 等待所有数据被分组,然后触发所有新的窗口数据。
我对两者进行了测试:
梁版本 2.11.0 和 scio 版本 0.7.4
梁版本 2.6.0 和 scio 版本 0.6.1
如您所见,数据是按组累积的,不会发出。
val windowedData = data.applyKvTransform(
Window.into[myt](
Sessions.withGapDuration(Duration.millis(1)))
.triggering(
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))
).orFinally(AfterWatermark.pastEndOfWindow())
).withAllowedLateness(Duration.standardSeconds(100))
.discardingFiredPanes()
)
Run Code Online (Sandbox Code Playgroud)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and …Run Code Online (Sandbox Code Playgroud) 从:
https://github.com/spotify/scio/wiki/Scio-data-guideline
“比 groupByKey 更喜欢组合/聚合/减少转换。请记住,减少操作必须是关联的和可交换的。”
为什么特别喜欢聚合而不是 groupByKey?
我正在使用Spotify Scio来读取从Stackdriver导出到Google云端存储的日志.它们是JSON文件,其中每一行都是单个条目.查看工作日志,似乎文件被拆分为块,然后以任何顺序读取.在这种情况下,我已经将我的工作限制在1名工人身上.有没有办法强制按顺序读取和处理这些块?
作为一个例子(textFile基本上是一个TextIO.Read):
val sc = ScioContext(myOptions)
sc.textFile(myFile).map(line => logger.info(line))
Run Code Online (Sandbox Code Playgroud)
将根据工作日志生成类似于此的输出:
line 5
line 6
line 7
line 8
<Some other work>
line 1
line 2
line 3
line 4
<Some other work>
line 9
line 10
line 11
line 12
Run Code Online (Sandbox Code Playgroud)
我想知道的是,是否有办法迫使它按顺序读取1-12行.我发现gzipping文件并使用指定的CompressionType读取它是一种解决方法,但我想知道是否有任何方法可以执行此操作,不涉及压缩或更改原始文件.