小编And*_*sov的帖子

Spark Streaming:为什么内部处理成本如此之高以处理几MB的用户状态?

根据我们的实验,我们发现当状态变为超过一百万个对象时,有状态的Spark Streaming内部处理成本会花费大量时间.因此延迟会受到影响,因为我们必须增加批处理间隔以避免不稳定的行为(处理时间>批处理间隔).

它与我们的应用程序的细节无关,因为它可以通过下面的代码重现.

什么是Spark内部处理/基础架构成本,它需要花费大量时间来处理用户状态?除了简单地增加批处理间隔之外,还有减少处理时间的选项吗?

我们计划广泛使用状态:在每个节点上至少100MB左右,以便将所有数据保存在内存中,并且每小时只丢弃一次.

增加批处理间隔会有所帮助,但我们希望保持批处理间隔最小.

原因可能不是国家占用的空间,而是大型对象图,因为当我们将列表更改为大型基元时,问题就消失了.

只是一个猜测:它可能与org.apache.spark.util.SizeEstimatorSpark内部使用有关,因为它会在不时进行分析时显示出来.

在此输入图像描述

以下是在现代iCore7上重现上图的简单演示:

  • 不到15 MB的州
  • 根本没有流输入
  • 最快(虚拟)'updateStateByKey'功能
  • 批间隔1秒
  • 检查点(必须由Spark提供)到本地磁盘
  • 在本地和YARN进行了测试

码:

package spark;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SlowSparkStreamingUpdateStateDemo {

    // Very simple state model
    static class State implements Serializable {
        final List<String> data;
        State(List<String> data) {
            this.data = data;
        }
    }

    public static void main(String[] args) {
        SparkConf conf = …
Run Code Online (Sandbox Code Playgroud)

java performance apache-spark spark-streaming

24
推荐指数
1
解决办法
879
查看次数

标签 统计

apache-spark ×1

java ×1

performance ×1

spark-streaming ×1