我正在运行带有2名工作人员的Spark流应用程序.应用程序具有联接和联合操作.
所有批次都已成功完成,但注意到随机溢出指标与输入数据大小或输出数据大小不一致(溢出内存超过20次).
请在下图中找到火花阶段的详细信息:
经过对此的研究,发现了
当没有足够的内存用于随机数据时,会发生随机溢出.
Shuffle spill (memory)
- 溢出时内存中数据的反序列化形式的大小
shuffle spill (disk)
- 溢出后磁盘上数据序列化形式的大小
由于反序列化数据比序列化数据占用更多空间.所以,Shuffle溢出(内存)更多.
注意到这个溢出内存大小非常大,输入数据很大.
我的疑问是:
这种溢出是否会对性能产生很大影响?
如何优化内存和磁盘的溢出?
是否有可以减少/控制这种巨大溢出的Spark Properties?
你好我想下载spark-core
,spark-streaming
,twitter4j
,和spark-streaming-twitter
下面的build.sbt文件:
name := "hello"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.4.1"
libraryDependencies ++= Seq(
"org.twitter4j" % "twitter4j-core" % "3.0.3",
"org.twitter4j" % "twitter4j-stream" % "3.0.3"
)
libraryDependencies += "org.apache.spark" % "spark-streaming-twitter_2.10" % "0.9.0-incubating"
Run Code Online (Sandbox Code Playgroud)
我只是在libraryDependencies
网上这个,所以我不确定使用哪个版本等.
有人可以向我解释我应该如何解决这个.sbt文件.我花了几个小时试图搞清楚,但没有一个建议工作.我scala
通过自制软件安装,我在版本上2.11.8
我的所有错误都是关于:
Modules were resolved with conflicting cross-version suffixes.
Run Code Online (Sandbox Code Playgroud) 我正在使用Spark Streaming处理两个Kafka队列之间的数据,但我似乎找不到从Spark写Kafka的好方法.我试过这个:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach {
case x: String => {
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
}
}
)
)
Run Code Online (Sandbox Code Playgroud)
并且它按预期工作,但是为每个消息实例化一个新的KafkaProducer在真实环境中显然是不可行的,我正在尝试解决它.
我想为每个进程保留一个实例的引用,并在需要发送消息时访问它.如何从Spark Streaming写入Kafka?
scala apache-kafka apache-spark spark-streaming spark-streaming-kafka
我一直试图spark-shell
在YARN client
模式下运行,但是我遇到了很多ClosedChannelException
错误.我正在使用spark 2.0.0 build for Hadoop 2.6.
以下是例外情况:
$ spark-2.0.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode client
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/09/13 14:12:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/09/13 14:12:38 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
16/09/13 14:12:55 ERROR cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
16/09/13 14:12:55 ERROR …
Run Code Online (Sandbox Code Playgroud) 我刚刚创建了python列表range(1,100000)
.
使用SparkContext完成以下步骤:
a = sc.parallelize([i for i in range(1, 100000)])
b = sc.parallelize([i for i in range(1, 100000)])
c = a.zip(b)
>>> [(1, 1), (2, 2), -----]
sum = sc.accumulator(0)
c.foreach(lambda (x, y): life.add((y-x)))
Run Code Online (Sandbox Code Playgroud)
其中发出如下警告:
ARN TaskSetManager:阶段3包含一个非常大的任务(4644 KB).建议的最大任务大小为100 KB.
如何解决此警告?有没有办法处理大小?而且,它会影响大数据的时间复杂度吗?
我相信,我有一个相对常见的火花流用例:
我有一个对象流,我想根据一些参考数据进行过滤
最初,我认为使用广播变量实现这是一件非常简单的事情:
public void startSparkEngine {
Broadcast<ReferenceData> refdataBroadcast
= sparkContext.broadcast(getRefData());
final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
final ReferenceData refData = refdataBroadcast.getValue();
return obj.getField().equals(refData.getField());
}
filteredStream.foreachRDD(rdd -> {
rdd.foreach(obj -> {
// Final processing of filtered objects
});
return null;
});
}
Run Code Online (Sandbox Code Playgroud)
但是,尽管很少,我的参考数据会定期更改
我的印象是我可以在驱动程序上修改和重新广播我的变量,它会传播给每个工作者,但是Broadcast
对象不是也不Serializable
需要final
.
我有什么替代品?我能想到的三个解决方案是:
将参考数据查找移动到一个forEachPartition
或forEachRdd
左右,使其完全驻留在工作者上.但是,参考数据存在于REST API中,因此我还需要以某种方式存储计时器/计数器以停止对流中的每个元素访问远程数据库.
每次refdata更改时,使用新的广播变量重新启动Spark上下文.
将参考数据转换为RDD,然后join
以我现在流式传输的方式将流转换为流Pair<MyObject, RefData>
,尽管这会将参考数据与每个对象一起发送.
我有一个连续运行的Spark Streaming作业.我如何优雅地停止工作?我已经阅读了在作业监视中附加关闭钩子并将SIGTERM发送到作业的通常建议.
sys.ShutdownHookThread {
logger.info("Gracefully stopping Application...")
ssc.stop(stopSparkContext = true, stopGracefully = true)
logger.info("Application stopped gracefully")
}
Run Code Online (Sandbox Code Playgroud)
它似乎工作,但看起来不是最简单的方法来阻止这项工作.我在这里错过了什么吗?
从代码的角度来看,它可能有意义,但您如何在群集环境中使用它?如果我们启动一个火花流工作(我们在集群中的所有节点上分配作业),我们将不得不跟踪作业的PID和运行它的节点.最后,当我们必须停止进程时,我们需要跟踪作业运行的节点以及该进程的PID.我只是希望流媒体作业有一种更简单的工作控制方式.
我正在开发一个Spark应用程序,我习惯将Spring作为依赖注入框架.现在我遇到了问题,处理部分使用了Spring的@Autowired功能,但它被Spark序列化和反序列化.
所以下面的代码让我陷入困境:
Processor processor = ...; // This is a Spring constructed object
// and makes all the trouble
JavaRDD<Txn> rdd = ...; // some data for Spark
rdd.foreachPartition(processor);
Run Code Online (Sandbox Code Playgroud)
处理器看起来像这样:
public class Processor implements VoidFunction<Iterator<Txn>>, Serializeable {
private static final long serialVersionUID = 1L;
@Autowired // This will not work if the object is deserialized
private transient DatabaseConnection db;
@Override
public void call(Iterator<Txn> txns) {
... // do some fance stuff
db.store(txns);
}
}
Run Code Online (Sandbox Code Playgroud)
所以我的问题是:是否甚至可以将Spring与Spark结合使用?如果没有,那么做这样的事情最优雅的方式是什么?任何帮助表示赞赏!
根据我们的实验,我们发现当状态变为超过一百万个对象时,有状态的Spark Streaming内部处理成本会花费大量时间.因此延迟会受到影响,因为我们必须增加批处理间隔以避免不稳定的行为(处理时间>批处理间隔).
它与我们的应用程序的细节无关,因为它可以通过下面的代码重现.
什么是Spark内部处理/基础架构成本,它需要花费大量时间来处理用户状态?除了简单地增加批处理间隔之外,还有减少处理时间的选项吗?
我们计划广泛使用状态:在每个节点上至少100MB左右,以便将所有数据保存在内存中,并且每小时只丢弃一次.
增加批处理间隔会有所帮助,但我们希望保持批处理间隔最小.
原因可能不是国家占用的空间,而是大型对象图,因为当我们将列表更改为大型基元时,问题就消失了.
只是一个猜测:它可能与org.apache.spark.util.SizeEstimator
Spark内部使用有关,因为它会在不时进行分析时显示出来.
以下是在现代iCore7上重现上图的简单演示:
码:
package spark;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class SlowSparkStreamingUpdateStateDemo {
// Very simple state model
static class State implements Serializable {
final List<String> data;
State(List<String> data) {
this.data = data;
}
}
public static void main(String[] args) {
SparkConf conf = …
Run Code Online (Sandbox Code Playgroud) apache-spark ×10
spark-streaming ×10
java ×4
scala ×3
apache-kafka ×1
broadcast ×1
hadoop ×1
hadoop-yarn ×1
jvm ×1
performance ×1
sbt ×1
spring ×1