And*_*sov 24 java performance apache-spark spark-streaming
根据我们的实验,我们发现当状态变为超过一百万个对象时,有状态的Spark Streaming内部处理成本会花费大量时间.因此延迟会受到影响,因为我们必须增加批处理间隔以避免不稳定的行为(处理时间>批处理间隔).
它与我们的应用程序的细节无关,因为它可以通过下面的代码重现.
什么是Spark内部处理/基础架构成本,它需要花费大量时间来处理用户状态?除了简单地增加批处理间隔之外,还有减少处理时间的选项吗?
我们计划广泛使用状态:在每个节点上至少100MB左右,以便将所有数据保存在内存中,并且每小时只丢弃一次.
增加批处理间隔会有所帮助,但我们希望保持批处理间隔最小.
原因可能不是国家占用的空间,而是大型对象图,因为当我们将列表更改为大型基元时,问题就消失了.
只是一个猜测:它可能与org.apache.spark.util.SizeEstimatorSpark内部使用有关,因为它会在不时进行分析时显示出来.
以下是在现代iCore7上重现上图的简单演示:
码:
package spark;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class SlowSparkStreamingUpdateStateDemo {
// Very simple state model
static class State implements Serializable {
final List<String> data;
State(List<String> data) {
this.data = data;
}
}
public static void main(String[] args) {
SparkConf conf = new SparkConf()
// Tried KryoSerializer, but it does not seem to help much
//.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setMaster("local[*]")
.setAppName(SlowSparkStreamingUpdateStateDemo.class.getName());
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1));
javaStreamingContext.checkpoint("checkpoint"); // a must (if you have stateful operation)
List<Tuple2<String, State>> initialRddGeneratedData = prepareInitialRddData();
System.out.println("Estimated size, bytes: " + SizeEstimator.estimate(initialRddGeneratedData));
JavaPairRDD<String, State> initialRdd = javaStreamingContext.sparkContext().parallelizePairs(initialRddGeneratedData);
JavaPairDStream<String, State> stream = javaStreamingContext
.textFileStream(".") // fake: effectively, no input at all
.mapToPair(input -> (Tuple2<String, State>) null) // fake to get JavaPairDStream
.updateStateByKey(
(inputs, maybeState) -> maybeState, // simplest possible dummy function
new HashPartitioner(javaStreamingContext.sparkContext().defaultParallelism()),
initialRdd); // set generated state
stream.foreachRDD(rdd -> { // simplest possible action (required by Spark)
System.out.println("Is empty: " + rdd.isEmpty());
return null;
});
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
private static List<Tuple2<String, State>> prepareInitialRddData() {
// 'stateCount' tuples with value = list of size 'dataListSize' of strings of length 'elementDataSize'
int stateCount = 1000;
int dataListSize = 200;
int elementDataSize = 10;
List<Tuple2<String, State>> initialRddInput = new ArrayList<>(stateCount);
for (int stateIdx = 0; stateIdx < stateCount; stateIdx++) {
List<String> stateData = new ArrayList<>(dataListSize);
for (int dataIdx = 0; dataIdx < dataListSize; dataIdx++) {
stateData.add(RandomStringUtils.randomAlphanumeric(elementDataSize));
}
initialRddInput.add(new Tuple2<>("state" + stateIdx, new State(stateData)));
}
return initialRddInput;
}
}
Run Code Online (Sandbox Code Playgroud)
Spark 1.6 中对状态管理进行了改进。
请参考 SPARK-2629 Spark Streaming 改进的状态管理;
在详细的设计规范中:
改进了 Spark Streaming 中的状态管理
一个性能缺陷如下:
需要更优化的状态管理,不扫描每个键 当前 updateStateByKey 扫描每个批处理间隔中的每个键,即使该键没有数据。虽然这种语义对于某些工作负载很有用,但大多数工作负载仅需要“扫描并更新有新数据的状态”。在每个批处理间隔中,只需要触及所有状态的一小部分。The cogroup-based implementation of updateStateByKey is not designed for this; cogroup scans all the keys every time. In fact, this causes the batch processing times of updateStateByKey to increase with the number of keys in the state, even if the data rate stays fixed.

| 归档时间: |
|
| 查看次数: |
879 次 |
| 最近记录: |