根据我们的实验,我们发现当状态变为超过一百万个对象时,有状态的Spark Streaming内部处理成本会花费大量时间.因此延迟会受到影响,因为我们必须增加批处理间隔以避免不稳定的行为(处理时间>批处理间隔).
它与我们的应用程序的细节无关,因为它可以通过下面的代码重现.
什么是Spark内部处理/基础架构成本,它需要花费大量时间来处理用户状态?除了简单地增加批处理间隔之外,还有减少处理时间的选项吗?
我们计划广泛使用状态:在每个节点上至少100MB左右,以便将所有数据保存在内存中,并且每小时只丢弃一次.
增加批处理间隔会有所帮助,但我们希望保持批处理间隔最小.
原因可能不是国家占用的空间,而是大型对象图,因为当我们将列表更改为大型基元时,问题就消失了.
只是一个猜测:它可能与org.apache.spark.util.SizeEstimatorSpark内部使用有关,因为它会在不时进行分析时显示出来.
以下是在现代iCore7上重现上图的简单演示:
码:
package spark;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class SlowSparkStreamingUpdateStateDemo {
// Very simple state model
static class State implements Serializable {
final List<String> data;
State(List<String> data) {
this.data = data;
}
}
public static void main(String[] args) {
SparkConf conf = …Run Code Online (Sandbox Code Playgroud) 在使用Spark Streaming处理顺序有限事件会话流时,选择无状态滑动窗口操作(例如reduceByKeyAndWindow)与选择保持状态(例如通过updateStateByKey或新mapStateByKey)会有什么考虑因素?
例如,请考虑以下情形:
可穿戴设备跟踪佩戴者进行的身体锻炼.设备会自动检测锻炼开始的时间,并发出消息; 在锻炼期间发出额外的信息(例如心率); 最后,在练习完成后发出消息.
期望的结果是每个运动会话的聚合记录流.即,应该将同一会话的所有事件聚合在一起(例如,以便每个会话可以保存在单个DB行中).请注意,每个会话的长度都是有限的,但来自多个设备的整个流是连续的.为方便起见,我们假设设备为每个锻炼课程生成一个GUID.
我可以看到使用Spark Streaming处理这个用例的两种方法:
使用不重叠的窗口,并保持状态.每个GUID保存一个状态,所有事件都与之匹配.当新事件到达时,状态被更新(例如,使用mapWithState),并且如果事件是"运动结束时",则将发出基于状态的聚合记录,并且移除密钥.
使用重叠的滑动窗口,并仅保留第一个会话.假设长度为2且间隔为1的滑动窗口(参见下图).还假设窗口长度为2 X(最大可能的运动时间).在每个窗口上,事件由GUID进行攻击,例如使用reduceByKeyAndWindow.然后,转储从窗口后半部分开始的所有会话,并释放剩余的会话.这使得每个事件只能使用一次,并确保属于同一会话的所有事件将聚合在一起.
方法#2的图表:
Run Code Online (Sandbox Code Playgroud)Only sessions starting in the areas marked with \\\ will be emitted. ----------- |window 1 | |\\\\| | ----------- ---------- |window 2 | |\\\\| | ----------- ---------- |window 3 | |\\\\| | -----------
我看到的利弊:
方法#1的计算成本较低,但需要保存和管理状态(例如,如果并发会话数增加,则状态可能比内存大).但是,如果最大并发会话数有限,则可能不是问题.
方法#2的成本是两倍(每个事件处理两次),并且具有更高的延迟(2倍最大运动时间),但更简单且易于管理,因为没有保留任何状态.
处理这个用例的最佳方法是 - 这些方法中的任何一种都是"正确的",还是有更好的方法?
应该考虑哪些其他优点/缺点?
rdd1.join(rdd2)如果rdd1并rdd2拥有相同的分区,会导致洗牌吗?
我使用Spark 1.3.0和python api.在转换庞大的数据帧时,我会缓存许多DF以加快执行速度;
df1.cache()
df2.cache()
Run Code Online (Sandbox Code Playgroud)
一旦某些数据帧的使用结束并且不再需要,我怎样才能从内存中删除DF(或取消缓存它?)?
例如,df1在用于df2少量转换的情况下使用整个代码,之后,它永远不需要.我想强行放下df2以释放更多的内存空间.
我正在尝试使用Twitter作为源执行Spark Streaming示例,如下所示:
public static void main (String.. args) {
SparkConf conf = new SparkConf().setAppName("Spark_Streaming_Twitter").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(2));
JavaSQLContext sqlCtx = new JavaSQLContext(sc);
String[] filters = new String[] {"soccer"};
JavaReceiverInputDStream<Status> receiverStream = TwitterUtils.createStream(jssc,filters);
jssc.start();
jssc.awaitTermination();
}
Run Code Online (Sandbox Code Playgroud)
但我得到以下例外
Exception in thread "main" java.lang.AssertionError: assertion failed: No output streams registered, so nothing to execute
at scala.Predef$.assert(Predef.scala:179)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:158)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:416)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:437)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:501)
at org.learning.spark.TwitterStreamSpark.main(TwitterStreamSpark.java:53)
Run Code Online (Sandbox Code Playgroud)
有任何建议如何解决这个问题?
Scala中有什么东西,
condition ? first_expression : second_expression;
Run Code Online (Sandbox Code Playgroud)
我可以在scala中使用map函数吗?我希望能够写出这样的东西:
val statuses = tweets.map(status => status.isTruncate? //do nothing | status.getText())
Run Code Online (Sandbox Code Playgroud)
如果无法使用内联函数,如何在其中写入条件map?
我有以下作为启动火花流工作的命令行.
spark-submit --class com.biz.test \
--packages \
org.apache.spark:spark-streaming-kafka_2.10:1.3.0 \
org.apache.hbase:hbase-common:1.0.0 \
org.apache.hbase:hbase-client:1.0.0 \
org.apache.hbase:hbase-server:1.0.0 \
org.json4s:json4s-jackson:3.2.11 \
./test-spark_2.10-1.0.8.jar \
>spark_log 2>&1 &
Run Code Online (Sandbox Code Playgroud)
作业无法启动以下错误:
Exception in thread "main" java.lang.IllegalArgumentException: Given path is malformed: org.apache.hbase:hbase-common:1.0.0
at org.apache.spark.util.Utils$.resolveURI(Utils.scala:1665)
at org.apache.spark.deploy.SparkSubmitArguments.parse$1(SparkSubmitArguments.scala:432)
at org.apache.spark.deploy.SparkSubmitArguments.parseOpts(SparkSubmitArguments.scala:288)
at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:87)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:105)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Run Code Online (Sandbox Code Playgroud)
我已经尝试删除格式并返回单行,但这不能解决问题.我也试过了很多变化:不同的版本,添加_2.10到artifactId的末尾,等等.
根据文档(spark-submit --help):
坐标的格式应为groupId:artifactId:version.
所以我所拥有的应该是有效的,并且应该参考这个包.
如果它有帮助,我正在运行Cloudera 5.4.4.
我究竟做错了什么?如何正确引用hbase包?
我有几个Apache Spark应用程序/脚本的用例,通常是以下形式:
一般ETL用例 - 更具体地说是将包含许多事件(想想事件源)的Cassandra列族转换为各种聚合列族.
流式使用案例 - 事件到达系统时的实时分析.
对于(1),我需要定期启动Spark应用程序.
对于(2),只需在启动时启动长时间运行的Spark Streaming进程并让它继续运行.
(注意 - 我使用Spark Standalone作为集群管理器,所以没有纱线或介子)
我正在尝试找出Spark应用程序的最常见/最佳实践部署策略.
到目前为止,我可以看到的选项是:
将我的程序部署为jar,并使用spark-submit运行各种任务 - 这似乎是spark文档中推荐的方式.关于这个策略的一些想法:
创建一个单独的webapp作为驱动程序.
Spark作业服务器(https://github.com/ooyala/spark-jobserver)
我想了解一个简单但强大的部署策略的普遍共识 - 我还没有能够通过拖网来确定一个.
非常感谢!
我在用spark 1.5.2.我需要使用kafka作为流媒体源来运行spark streaming工作.我需要从kafka中的多个主题中读取并以不同方式处理每个主题.
那里,火花作业中"spark.yarn.executor.memoryOverhead"的值是应该分配给App的纱线还是仅仅是最大值?
apache-spark spark-streaming apache-spark-sql apache-spark-mllib
apache-spark ×10
spark-streaming ×10
java ×2
apache-kafka ×1
hbase ×1
map-function ×1
performance ×1
rdd ×1
scala ×1