标签: spark-streaming

Spark Streaming:为什么内部处理成本如此之高以处理几MB的用户状态?

根据我们的实验,我们发现当状态变为超过一百万个对象时,有状态的Spark Streaming内部处理成本会花费大量时间.因此延迟会受到影响,因为我们必须增加批处理间隔以避免不稳定的行为(处理时间>批处理间隔).

它与我们的应用程序的细节无关,因为它可以通过下面的代码重现.

什么是Spark内部处理/基础架构成本,它需要花费大量时间来处理用户状态?除了简单地增加批处理间隔之外,还有减少处理时间的选项吗?

我们计划广泛使用状态:在每个节点上至少100MB左右,以便将所有数据保存在内存中,并且每小时只丢弃一次.

增加批处理间隔会有所帮助,但我们希望保持批处理间隔最小.

原因可能不是国家占用的空间,而是大型对象图,因为当我们将列表更改为大型基元时,问题就消失了.

只是一个猜测:它可能与org.apache.spark.util.SizeEstimatorSpark内部使用有关,因为它会在不时进行分析时显示出来.

在此输入图像描述

以下是在现代iCore7上重现上图的简单演示:

  • 不到15 MB的州
  • 根本没有流输入
  • 最快(虚拟)'updateStateByKey'功能
  • 批间隔1秒
  • 检查点(必须由Spark提供)到本地磁盘
  • 在本地和YARN进行了测试

码:

package spark;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SlowSparkStreamingUpdateStateDemo {

    // Very simple state model
    static class State implements Serializable {
        final List<String> data;
        State(List<String> data) {
            this.data = data;
        }
    }

    public static void main(String[] args) {
        SparkConf conf = …
Run Code Online (Sandbox Code Playgroud)

java performance apache-spark spark-streaming

24
推荐指数
1
解决办法
879
查看次数

Spark Streaming:无状态的整体窗口与保持状态

在使用Spark Streaming处理顺序有限事件会话流时,选择无状态滑动窗口操作(例如reduceByKeyAndWindow)与选择保持状态(例如通过updateStateByKey或新mapStateByKey)会有什么考虑因素?

例如,请考虑以下情形:

可穿戴设备跟踪佩戴者进行的身体锻炼.设备会自动检测锻炼开始的时间,并发出消息; 在锻炼期间发出额外的信息(例如心率); 最后,在练习完成后发出消息.

期望的结果是每个运动会话的聚合记录流.即,应该将同一会话的所有事件聚合在一起(例如,以便每个会话可以保存在单个DB行中).请注意,每个会话的长度都是有限的,但来自多个设备的整个流是连续的.为方便起见,我们假设设备为每个锻炼课程生成一个GUID.

我可以看到使用Spark Streaming处理这个用例的两种方法:

  1. 使用不重叠的窗口,并保持状态.每个GUID保存一个状态,所有事件都与之匹配.当新事件到达时,状态被更新(例如,使用mapWithState),并且如果事件是"运动结束时",则将发出基于状态的聚合记录,并且移除密钥.

  2. 使用重叠的滑动窗口,并仅保留第一个会话.假设长度为2且间隔为1的滑动窗口(参见下图).还假设窗口长度为2 X(最大可能的运动时间).在每个窗口上,事件由GUID进行攻击,例如使用reduceByKeyAndWindow.然后,转储从窗口后半部分开始的所有会话,并释放剩余的会话.这使得每个事件只能使用一次,并确保属于同一会话的所有事件将聚合在一起.

方法#2的图表:

Only sessions starting in the areas marked with \\\ will be emitted. 
-----------
|window 1 |
|\\\\|    |
-----------
     ----------
     |window 2 |
     |\\\\|    |  
     -----------
          ----------
          |window 3 |
          |\\\\|    |
          -----------
Run Code Online (Sandbox Code Playgroud)

我看到的利弊:

方法#1的计算成本较低,但需要保存和管理状态(例如,如果并发会话数增加,则状态可能比内存大).但是,如果最大并发会话数有限,则可能不是问题.

方法#2的成本是两倍(每个事件处理两次),并且具有更高的延迟(2倍最大运动时间),但更简单且易于管理,因为没有保留任何状态.

处理这个用例的最佳方法是 - 这些方法中的任何一种都是"正确的",还是有更好的方法?

应该考虑哪些其他优点/缺点?

apache-spark spark-streaming

23
推荐指数
1
解决办法
2800
查看次数

联合分区RDD的连接是否会导致Apache Spark的混乱?

rdd1.join(rdd2)如果rdd1rdd2拥有相同的分区,会导致洗牌吗?

apache-spark spark-streaming rdd

22
推荐指数
1
解决办法
1万
查看次数

从缓存中删除spark数据帧

我使用Spark 1.3.0和python api.在转换庞大的数据帧时,我会缓存许多DF以加快执行速度;

df1.cache()
df2.cache()
Run Code Online (Sandbox Code Playgroud)

一旦某些数据帧的使用结束并且不再需要,我怎样才能从内存中删除DF(或取消缓存它?)?

例如,df1在用于df2少量转换的情况下使用整个代码,之后,它永远不需要.我想强行放下df2以释放更多的内存空间.

apache-spark spark-streaming apache-spark-sql

22
推荐指数
2
解决办法
4万
查看次数

为什么启动StreamingContext失败并出现"IllegalArgumentException:要求失败:没有注册输出操作,所以无需执行"?

我正在尝试使用Twitter作为源执行Spark Streaming示例,如下所示:

public static void main (String.. args) {

    SparkConf conf = new SparkConf().setAppName("Spark_Streaming_Twitter").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);       
        JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(2));      
        JavaSQLContext sqlCtx = new JavaSQLContext(sc);     


        String[] filters = new String[] {"soccer"};

        JavaReceiverInputDStream<Status> receiverStream = TwitterUtils.createStream(jssc,filters);



         jssc.start();
         jssc.awaitTermination();

}
Run Code Online (Sandbox Code Playgroud)

但我得到以下例外

Exception in thread "main" java.lang.AssertionError: assertion failed: No output streams registered, so nothing to execute
    at scala.Predef$.assert(Predef.scala:179)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:158)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:416)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:437)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:501)
    at org.learning.spark.TwitterStreamSpark.main(TwitterStreamSpark.java:53)
Run Code Online (Sandbox Code Playgroud)

有任何建议如何解决这个问题?

java apache-spark spark-streaming

21
推荐指数
1
解决办法
2万
查看次数

地图功能条件

Scala中有什么东西,

condition ? first_expression : second_expression;
Run Code Online (Sandbox Code Playgroud)

我可以在scala中使用map函数吗?我希望能够写出这样的东西:

val statuses = tweets.map(status => status.isTruncate? //do nothing | status.getText())
Run Code Online (Sandbox Code Playgroud)

如果无法使用内联函数,如何在其中写入条件map

scala apache-spark spark-streaming map-function

21
推荐指数
2
解决办法
3万
查看次数

如何使用--packages为spark-submit指定多个依赖项?

我有以下作为启动火花流工作的命令行.

    spark-submit --class com.biz.test \
            --packages \
                org.apache.spark:spark-streaming-kafka_2.10:1.3.0 \
                org.apache.hbase:hbase-common:1.0.0 \
                org.apache.hbase:hbase-client:1.0.0 \
                org.apache.hbase:hbase-server:1.0.0 \
                org.json4s:json4s-jackson:3.2.11 \
            ./test-spark_2.10-1.0.8.jar \
            >spark_log 2>&1 &
Run Code Online (Sandbox Code Playgroud)

作业无法启动以下错误:

Exception in thread "main" java.lang.IllegalArgumentException: Given path is malformed: org.apache.hbase:hbase-common:1.0.0
    at org.apache.spark.util.Utils$.resolveURI(Utils.scala:1665)
    at org.apache.spark.deploy.SparkSubmitArguments.parse$1(SparkSubmitArguments.scala:432)
    at org.apache.spark.deploy.SparkSubmitArguments.parseOpts(SparkSubmitArguments.scala:288)
    at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:87)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:105)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Run Code Online (Sandbox Code Playgroud)

我已经尝试删除格式并返回单行,但这不能解决问题.我也试过了很多变化:不同的版本,添加_2.10到artifactId的末尾,等等.

根据文档(spark-submit --help):

坐标的格式应为groupId:artifactId:version.

所以我所拥有的应该是有效的,并且应该参考这个包.

如果它有帮助,我正在运行Cloudera 5.4.4.

我究竟做错了什么?如何正确引用hbase包?

hbase apache-spark spark-streaming

21
推荐指数
2
解决办法
1万
查看次数

Apache Spark应用程序部署最佳实践

我有几个Apache Spark应用程序/脚本的用例,通常是以下形式:

一般ETL用例 - 更具体地说是将包含许多事件(想想事件源)的Cassandra列族转换为各种聚合列族.

流式使用案例 - 事件到达系统时的实时分析.

对于(1),我需要定期启动Spark应用程序.

对于(2),只需在启动时启动长时间运行的Spark Streaming进程并让它继续运行.

(注意 - 我使用Spark Standalone作为集群管理器,所以没有纱线或介子)

我正在尝试找出Spark应用程序的最常见/最佳实践部署策略.

到目前为止,我可以看到的选项是:

  1. 将我的程序部署为jar,并使用spark-submit运行各种任务 - 这似乎是spark文档中推荐的方式.关于这个策略的一些想法:

    • 你如何开始/停止任务 - 只使用简单的bash脚本?
    • 如何管理调度? - 只需使用cron?
    • 任何弹性?(例如,如果驱动程序服务器死了,谁会安排作业运行?)
  2. 创建一个单独的webapp作为驱动程序.

    • 以编程方式创建一个spark上下文以与spark集群通信
    • 允许用户通过http界面启动任务
    • 使用Quartz(例如)来管理调度
    • 可以使用集群与zookeeper选举来恢复弹性
  3. Spark作业服务器(https://github.com/ooyala/spark-jobserver)

    • 我不认为(2)对我来说有很多好处,因为我(还)没有很多团队和项目与Spark交谈,并且仍然需要一些应用程序来与作业服务器交谈
    • 就我所见,没有内置调度

我想了解一个简单但强大的部署策略的普遍共识 - 我还没有能够通过拖网来确定一个.

非常感谢!

apache-spark spark-streaming

20
推荐指数
1
解决办法
2486
查看次数

Spark:并行处理多个kafka主题

我在用spark 1.5.2.我需要使用kafka作为流媒体源来运行spark streaming工作.我需要从kafka中的多个主题中读取并以不同方式处理每个主题.

  1. 在同一份工作中做这件事是个好主意吗?如果是这样,我应该为每个主题创建一个包含多个分区或不同流的单个流吗?
  2. 我正在使用kafka直接蒸汽.据我所知,spark为每个分区启动了长时间运行的接收器.我有一个相对较小的集群,6个节点,每个节点有4个核心.如果我在每个主题中都有很多主题和分区,那么效率是否会受到影响,因为大多数执行者都忙于长时间运行的接收器?如果我的理解是错误的,请纠正我

apache-kafka apache-spark spark-streaming

20
推荐指数
2
解决办法
1万
查看次数

"spark.yarn.executor.memoryOverhead"设置的值?

那里,火花作业中"spark.yarn.executor.memoryOverhead"的值是应该分配给App的纱线还是仅仅是最大值?

apache-spark spark-streaming apache-spark-sql apache-spark-mllib

19
推荐指数
1
解决办法
3万
查看次数