标签: spark-streaming

如何更新RDD?

我们正在开发Spark框架,其中我们将历史数据移动到RDD集合中.

基本上,RDD是我们进行操作的不可变的只读数据集.基于此,我们已将历史数据移至RDD,并在此类RDD上进行过滤/映射等计算.

现在有一个用例,RDD中的数据子集得到更新,我们必须重新计算这些值.

HistoricalData采用RDD的形式.我根据请求范围创建另一个RDD,并在ScopeCollection中保存该RDD的引用

到目前为止,我已经能够想到以下方法 -

方法1:广播变化:

  1. 对于每个更改请求,我的服务器获取特定于范围的RDD并生成作业
  2. 在工作中,在该RDD上应用地图阶段 -

    2.a. 对于RDD中的每个节点,在广播上进行查找并创建一个现在更新的新值,从而创建一个新的RDD
    2.b. 现在我在step2.a上再次对这个新的RDD进行所有计算.像乘法,减少等
    2.c. 我将此RDDs引用保存在我的ScopeCollection中

方法2:为更新创建RDD

  1. 对于每个更改请求,我的服务器获取特定于范围的RDD并生成作业
  2. 在每个RDD上,使用具有更改的新RDD进行连接
  3. 现在我在步骤2中再次对这个新的RDD进行所有计算,如乘法,减少等

方法3:

我曾想过创建流RDD,我不断更新相同的RDD并进行重新计算.但据我所知,它可以从Flume或Kafka获取流.而在我的情况下,值是基于用户交互在应用程序本身中生成的.因此,我无法在上下文中看到流RDD的任何集成点.

关于哪种方法更好或任何其他适合此方案的方法的任何建议.

TIA!

apache-spark spark-streaming rdd

19
推荐指数
1
解决办法
5260
查看次数

Spark Kafka Direct DStream - 如果设置了num-executors,则在yarn-cluster模式下有多少个执行程序和RDD分区?


我正在尝试使用Spark Kafka Direct Stream方法.它通过创建与kafka主题分区一样多的RDD分区来简化并行性,如本文档中所述.根据我的理解,spark将为每个RDD分区创建一个执行程序来进行计算.

因此,当我以纱线群集模式提交应用程序,并将选项num-executors指定为与分区数量不同的值时,将会有多少执行程序?

例如,有一个带有2个分区的kafka主题,我将num-executors指定为4:

export YARN_CONF_DIR=$HADOOP_HOME/client_conf

./bin/spark-submit \
--class playground.MainClass \
--master yarn-cluster \
--num-executors 4 \
../spark_applications/uber-spark-streaming-0.0.1-SNAPSHOT.jar \
127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095 topic_1
Run Code Online (Sandbox Code Playgroud)

我试一试,发现执行器的数量是4,每个执行器都会从kafka读取和处理数据.为什么?kafka主题中只有2个分区,4个执行程序如何从kafka主题中读取,该主题只有2个分区?

以下是spark应用程序和日志的详细信息.

我的spark应用程序,它从每个执行程序中的kafka 打印收到的消息(以flatMap方法):

    ...
    String brokers = args[0];
    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(args[1].split(",")));
    kafkaParams.put("metadata.broker.list", brokers);

    JavaPairInputDStream<String, String> messages =
        KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
            kafkaParams, topicsSet);

    JavaPairDStream<String, Integer> wordCounts =
        messages.flatMap(new FlatMapFunction<Tuple2<String, String>, String>()
        {
            public Iterable<String> call(Tuple2<String, String> tuple) throws Exception
            {
                System.out.println(String.format("[received from kafka] tuple_1 …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-streaming

19
推荐指数
1
解决办法
3830
查看次数

火花流检查点恢复非常非常缓慢

  • 目标:从Kinesis读取并通过火花流将数据以Parquet格式存储到S3.
  • 情况:应用程序最初运行正常,运行1小时的批次,平均处理时间少于30分钟.出于某种原因,我们可以说应用程序崩溃了,我们尝试从检查点重新启动.处理现在需要永远,而不是前进.我们尝试以1分钟的批处理间隔测试相同的东西,处理运行良好,批次完成需要1.2分钟.当我们从检查点恢复时,每批需要大约15分钟.
  • 注意:我们使用s3作为检查点使用1个执行器,每个执行器有19g内存和3个内核

附上截图:

首次运行 - 检查点恢复之前 在检查点之前 - 流媒体页面

在检查点之前 - 工作页面

在检查点之前 - 乔布斯Page2

试图从检查点恢复: 检查点之后 -  Streaming Page 检查点后 - 工作页面

Config.scala

object Config {

  val sparkConf = new SparkConf


  val sc = new SparkContext(sparkConf)

  val sqlContext = new HiveContext(sc)


  val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
  val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)

  val checkpointDirectory =  sc.hadoopConfiguration.get("checkpointDirectory")

//  sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

  DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))

  val numStreams = 2

  def getSparkContext(): SparkContext = {
    this.sc
  }

  def getSqlContext(): HiveContext = {
    this.sqlContext
  }





}
Run Code Online (Sandbox Code Playgroud)

S3Basin.scala

object S3Basin {
  def main(args: Array[String]): Unit = {
    Kinesis.startStreaming(s3basinFunction _)
  }

  def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={ …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 apache-spark amazon-kinesis spark-streaming checkpointing

18
推荐指数
1
解决办法
1500
查看次数

运行TwitterPopularTags时java.lang.NoClassDefFoundError:org/apache/spark/streaming/twitter/TwitterUtils $

我是Spark streaming和Scala的初学者.对于项目要求,我试图在github中运行存在的TwitterPopularTags示例.由于SBT组装不适合我,我不熟悉SBT,我正在尝试使用Maven进行构建.经过很多初步的打嗝,我能够创建jar文件.但在尝试执行它时,我收到以下错误.有人可以帮我解决这个问题吗?

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/twitter/TwitterUtils$
    at TwitterPopularTags$.main(TwitterPopularTags.scala:43)
    at TwitterPopularTags.main(TwitterPopularTags.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.twitter.TwitterUtils$
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 9 more
Run Code Online (Sandbox Code Playgroud)

我添加了以下依赖项Spark-streaming_2.10:1.1.0 Spark-core_2.10:1.1.0 Spark-streaming-twitter_2.10:1.1.0

我甚至尝试过1.2.0 for Spark-streaming-twitter,但这也给了我同样的错误.

我在这里先向您的帮助表示感谢.

问候,vpv

scala noclassdeffounderror maven apache-spark spark-streaming

17
推荐指数
3
解决办法
3万
查看次数

使用Kafka直接流来消除Yarn上的堆内存泄漏

我正在使用java 1.8.0_45和Kafka直接流在Yarn(Apache发行版2.6.0)上运行spark streaming 1.4.0.我也使用scala 2.11支持spark.

我看到的问题是驱动程序和执行程序容器都在逐渐增加物理内存使用量,直到纱线容器杀死它为止.我在驱动程序中配置了高达192M堆和384堆堆空间,但它最终耗尽了它

对于常规GC循环,堆内存似乎很好.在任何此类运行中都没有遇到OutOffMemory

事实上,我仍然没有在卡夫卡队列上产生任何流量.这是我正在使用的代码

object SimpleSparkStreaming extends App {

val conf = new SparkConf()
val ssc = new StreamingContext(conf,Seconds(conf.getLong("spark.batch.window.size",1L)));
ssc.checkpoint("checkpoint")
val topics = Set(conf.get("spark.kafka.topic.name")); 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> conf.get("spark.kafka.broker.list"))
            val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)
            kafkaStream.foreachRDD(rdd => {
                rdd.foreach(x => {
                    println(x._2)
                })

            })
    kafkaStream.print()
            ssc.start() 

            ssc.awaitTermination()

}
Run Code Online (Sandbox Code Playgroud)

我在CentOS 7上运行它.用于spark提交的命令如下

./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
--conf spark.yarn.executor.memoryOverhead=256 \
--conf spark.yarn.driver.memoryOverhead=384 \
--conf spark.kafka.topic.name=test \
--conf spark.kafka.broker.list=172.31.45.218:9092 \
--conf spark.batch.window.size=1 \
--conf spark.app.name="Simple Spark Kafka application" …
Run Code Online (Sandbox Code Playgroud)

apache-kafka hadoop-yarn apache-spark spark-streaming apache-spark-1.4

17
推荐指数
1
解决办法
1256
查看次数

Spark DataFrame:在orderBy维护该命令之后是否groupBy?

我有一个Spark 2.0数据帧,example具有以下结构:

id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.
Run Code Online (Sandbox Code Playgroud)

它包含每个id的24个条目(一天中每小时一个),并使用orderBy函数按id,小时排序.

我创建了一个聚合器groupConcat:

  def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
    override def zero: String = ""

    override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)

    override def merge(b1: String, b2: String) = b1 + b2

    override def finish(b: String) = b.substring(1)

    override …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-streaming apache-spark-sql spark-dataframe

16
推荐指数
3
解决办法
2万
查看次数

如何从迭代器创建Spark RDD?

为了说清楚,我不是从数组/列表中寻找RDD

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7); // sample
JavaRDD<Integer> rdd = new JavaSparkContext().parallelize(list);
Run Code Online (Sandbox Code Playgroud)

如何在没有在内存中完全缓冲的情况下从java迭代器创建spark RDD ?

Iterator<Integer> iterator = Arrays.asList(1, 2, 3, 4).iterator(); //sample iterator for illustration
JavaRDD<Integer> rdd = new JavaSparkContext().what("?", iterator); //the Question
Run Code Online (Sandbox Code Playgroud)

附加问题:

是否要求源可重新读取(或能够多次读取)以提供RDD的弹性?换句话说,由于迭代器基本上只读一次,是否有可能从迭代器创建弹性分布式数据集(RDD)?

apache-spark spark-streaming

15
推荐指数
1
解决办法
7167
查看次数

如何在同一个Spark项目中同时使用Scala和Python?

是否可以将Spark RDD传递给Python?

因为我需要一个python库来对我的数据进行一些计算,但我的主要Spark项目是基于Scala的.有没有办法混合它们或让python访问相同的火花上下文?

python scala apache-spark spark-streaming pyspark

15
推荐指数
2
解决办法
2万
查看次数

Spark Streaming:foreachRDD更新我的mongo RDD

我想在每次进入里面时创建一个新的 mongodb RDD foreachRDD.但是我有序列化问题:

 mydstream  
   .foreachRDD(rdd => {
      val mongoClient = MongoClient("localhost", 27017)
      val db = mongoClient(mongoDatabase)
      val coll = db(mongoCollection)
      // ssc is my StreamingContext
      val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) })
Run Code Online (Sandbox Code Playgroud)

这会给我一个错误:

object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@31133b6e)
Run Code Online (Sandbox Code Playgroud)

任何的想法?

mongodb apache-spark spark-streaming

15
推荐指数
1
解决办法
945
查看次数

Spark Streaming mapWithState似乎定期重建完整状态

我正在开发一个Scala(2.11)/ Spark(1.6.1)流式项目,mapWithState()用于跟踪以前批次中看到的数据.

状态分布在多个节点上的20个分区中,使用StateSpec.function(trackStateFunc _).numPartitions(20).在这种状态下,我们只有几个键(~100)映射到Sets最多约160,000个条目,这些条目在整个应用程序中增长.整个状态最多3GB,可以由群集中的每个节点处理.在每个批次中,一些数据被添加到一个状态,但直到过程结束时才被删除,即约15分钟.

在遵循应用程序UI时,与其他批次相比,每10个批次的处理时间非常长.看图像:

尖峰显示更高的处理时间.

黄色字段代表高处理时间.

在此输入图像描述

更详细的作业视图显示,在这些批次中发生在某一点,恰好是"跳过"所有20个分区.或者这就是UI所说的.

在此输入图像描述

我的理解skipped是每个状态分区是一个可能的任务,没有被执行,因为它不需要重新计算.但是,我不明白为什么skips每个工作的数量变化以及为什么最后的工作需要如此多的处理.无论状态大小如何,都会出现更高的处理时间,它只会影响持续时间.

这是mapWithState()功能中的错误还是这个预期的行为?底层数据结构是否需要某种重新洗牌,Set状态是否需要复制数据?或者它更可能是我的应用程序中的缺陷?

scala apache-spark spark-streaming

15
推荐指数
1
解决办法
3140
查看次数