根据我们的实验,我们发现当状态变为超过一百万个对象时,有状态的Spark Streaming内部处理成本会花费大量时间.因此延迟会受到影响,因为我们必须增加批处理间隔以避免不稳定的行为(处理时间>批处理间隔).
它与我们的应用程序的细节无关,因为它可以通过下面的代码重现.
什么是Spark内部处理/基础架构成本,它需要花费大量时间来处理用户状态?除了简单地增加批处理间隔之外,还有减少处理时间的选项吗?
我们计划广泛使用状态:在每个节点上至少100MB左右,以便将所有数据保存在内存中,并且每小时只丢弃一次.
增加批处理间隔会有所帮助,但我们希望保持批处理间隔最小.
原因可能不是国家占用的空间,而是大型对象图,因为当我们将列表更改为大型基元时,问题就消失了.
只是一个猜测:它可能与org.apache.spark.util.SizeEstimator
Spark内部使用有关,因为它会在不时进行分析时显示出来.
以下是在现代iCore7上重现上图的简单演示:
码:
package spark;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class SlowSparkStreamingUpdateStateDemo {
// Very simple state model
static class State implements Serializable {
final List<String> data;
State(List<String> data) {
this.data = data;
}
}
public static void main(String[] args) {
SparkConf conf = …
Run Code Online (Sandbox Code Playgroud)