我们正在开发Spark框架,其中我们将历史数据移动到RDD集合中.
基本上,RDD是我们进行操作的不可变的只读数据集.基于此,我们已将历史数据移至RDD,并在此类RDD上进行过滤/映射等计算.
现在有一个用例,RDD中的数据子集得到更新,我们必须重新计算这些值.
HistoricalData采用RDD的形式.我根据请求范围创建另一个RDD,并在ScopeCollection中保存该RDD的引用
到目前为止,我已经能够想到以下方法 -
方法1:广播变化:
方法2:为更新创建RDD
方法3:
我曾想过创建流RDD,我不断更新相同的RDD并进行重新计算.但据我所知,它可以从Flume或Kafka获取流.而在我的情况下,值是基于用户交互在应用程序本身中生成的.因此,我无法在上下文中看到流RDD的任何集成点.
关于哪种方法更好或任何其他适合此方案的方法的任何建议.
TIA!
我正在尝试使用Spark Kafka Direct Stream方法.它通过创建与kafka主题分区一样多的RDD分区来简化并行性,如本文档中所述.根据我的理解,spark将为每个RDD分区创建一个执行程序来进行计算.
因此,当我以纱线群集模式提交应用程序,并将选项num-executors指定为与分区数量不同的值时,将会有多少执行程序?
例如,有一个带有2个分区的kafka主题,我将num-executors指定为4:
export YARN_CONF_DIR=$HADOOP_HOME/client_conf
./bin/spark-submit \
--class playground.MainClass \
--master yarn-cluster \
--num-executors 4 \
../spark_applications/uber-spark-streaming-0.0.1-SNAPSHOT.jar \
127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095 topic_1
Run Code Online (Sandbox Code Playgroud)
我试一试,发现执行器的数量是4,每个执行器都会从kafka读取和处理数据.为什么?kafka主题中只有2个分区,4个执行程序如何从kafka主题中读取,该主题只有2个分区?
以下是spark应用程序和日志的详细信息.
我的spark应用程序,它从每个执行程序中的kafka 打印收到的消息(以flatMap方法):
...
String brokers = args[0];
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(args[1].split(",")));
kafkaParams.put("metadata.broker.list", brokers);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topicsSet);
JavaPairDStream<String, Integer> wordCounts =
messages.flatMap(new FlatMapFunction<Tuple2<String, String>, String>()
{
public Iterable<String> call(Tuple2<String, String> tuple) throws Exception
{
System.out.println(String.format("[received from kafka] tuple_1 …Run Code Online (Sandbox Code Playgroud) 附上截图:
Config.scala
object Config {
val sparkConf = new SparkConf
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)
val checkpointDirectory = sc.hadoopConfiguration.get("checkpointDirectory")
// sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))
val numStreams = 2
def getSparkContext(): SparkContext = {
this.sc
}
def getSqlContext(): HiveContext = {
this.sqlContext
}
}
Run Code Online (Sandbox Code Playgroud)
S3Basin.scala
object S3Basin {
def main(args: Array[String]): Unit = {
Kinesis.startStreaming(s3basinFunction _)
}
def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={ …Run Code Online (Sandbox Code Playgroud) amazon-s3 apache-spark amazon-kinesis spark-streaming checkpointing
我是Spark streaming和Scala的初学者.对于项目要求,我试图在github中运行存在的TwitterPopularTags示例.由于SBT组装不适合我,我不熟悉SBT,我正在尝试使用Maven进行构建.经过很多初步的打嗝,我能够创建jar文件.但在尝试执行它时,我收到以下错误.有人可以帮我解决这个问题吗?
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/twitter/TwitterUtils$
at TwitterPopularTags$.main(TwitterPopularTags.scala:43)
at TwitterPopularTags.main(TwitterPopularTags.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.twitter.TwitterUtils$
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 9 more
Run Code Online (Sandbox Code Playgroud)
我添加了以下依赖项Spark-streaming_2.10:1.1.0 Spark-core_2.10:1.1.0 Spark-streaming-twitter_2.10:1.1.0
我甚至尝试过1.2.0 for Spark-streaming-twitter,但这也给了我同样的错误.
我在这里先向您的帮助表示感谢.
问候,vpv
scala noclassdeffounderror maven apache-spark spark-streaming
我正在使用java 1.8.0_45和Kafka直接流在Yarn(Apache发行版2.6.0)上运行spark streaming 1.4.0.我也使用scala 2.11支持spark.
我看到的问题是驱动程序和执行程序容器都在逐渐增加物理内存使用量,直到纱线容器杀死它为止.我在驱动程序中配置了高达192M堆和384堆堆空间,但它最终耗尽了它
对于常规GC循环,堆内存似乎很好.在任何此类运行中都没有遇到OutOffMemory
事实上,我仍然没有在卡夫卡队列上产生任何流量.这是我正在使用的代码
object SimpleSparkStreaming extends App {
val conf = new SparkConf()
val ssc = new StreamingContext(conf,Seconds(conf.getLong("spark.batch.window.size",1L)));
ssc.checkpoint("checkpoint")
val topics = Set(conf.get("spark.kafka.topic.name"));
val kafkaParams = Map[String, String]("metadata.broker.list" -> conf.get("spark.kafka.broker.list"))
val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)
kafkaStream.foreachRDD(rdd => {
rdd.foreach(x => {
println(x._2)
})
})
kafkaStream.print()
ssc.start()
ssc.awaitTermination()
}
Run Code Online (Sandbox Code Playgroud)
我在CentOS 7上运行它.用于spark提交的命令如下
./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
--conf spark.yarn.executor.memoryOverhead=256 \
--conf spark.yarn.driver.memoryOverhead=384 \
--conf spark.kafka.topic.name=test \
--conf spark.kafka.broker.list=172.31.45.218:9092 \
--conf spark.batch.window.size=1 \
--conf spark.app.name="Simple Spark Kafka application" …Run Code Online (Sandbox Code Playgroud) apache-kafka hadoop-yarn apache-spark spark-streaming apache-spark-1.4
我有一个Spark 2.0数据帧,example具有以下结构:
id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.
Run Code Online (Sandbox Code Playgroud)
它包含每个id的24个条目(一天中每小时一个),并使用orderBy函数按id,小时排序.
我创建了一个聚合器groupConcat:
def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
override def zero: String = ""
override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)
override def merge(b1: String, b2: String) = b1 + b2
override def finish(b: String) = b.substring(1)
override …Run Code Online (Sandbox Code Playgroud) scala apache-spark spark-streaming apache-spark-sql spark-dataframe
为了说清楚,我不是从数组/列表中寻找RDD
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7); // sample
JavaRDD<Integer> rdd = new JavaSparkContext().parallelize(list);
Run Code Online (Sandbox Code Playgroud)
如何在没有在内存中完全缓冲的情况下从java迭代器创建spark RDD ?
Iterator<Integer> iterator = Arrays.asList(1, 2, 3, 4).iterator(); //sample iterator for illustration
JavaRDD<Integer> rdd = new JavaSparkContext().what("?", iterator); //the Question
Run Code Online (Sandbox Code Playgroud)
附加问题:
是否要求源可重新读取(或能够多次读取)以提供RDD的弹性?换句话说,由于迭代器基本上只读一次,是否有可能从迭代器创建弹性分布式数据集(RDD)?
是否可以将Spark RDD传递给Python?
因为我需要一个python库来对我的数据进行一些计算,但我的主要Spark项目是基于Scala的.有没有办法混合它们或让python访问相同的火花上下文?
我想在每次进入里面时创建一个新的 mongodb RDD foreachRDD.但是我有序列化问题:
mydstream
.foreachRDD(rdd => {
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
// ssc is my StreamingContext
val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) })
Run Code Online (Sandbox Code Playgroud)
这会给我一个错误:
object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@31133b6e)
Run Code Online (Sandbox Code Playgroud)
任何的想法?
我正在开发一个Scala(2.11)/ Spark(1.6.1)流式项目,mapWithState()用于跟踪以前批次中看到的数据.
状态分布在多个节点上的20个分区中,使用StateSpec.function(trackStateFunc _).numPartitions(20).在这种状态下,我们只有几个键(~100)映射到Sets最多约160,000个条目,这些条目在整个应用程序中增长.整个状态最多3GB,可以由群集中的每个节点处理.在每个批次中,一些数据被添加到一个状态,但直到过程结束时才被删除,即约15分钟.
在遵循应用程序UI时,与其他批次相比,每10个批次的处理时间非常长.看图像:
黄色字段代表高处理时间.
更详细的作业视图显示,在这些批次中发生在某一点,恰好是"跳过"所有20个分区.或者这就是UI所说的.
我的理解skipped是每个状态分区是一个可能的任务,没有被执行,因为它不需要重新计算.但是,我不明白为什么skips每个工作的数量变化以及为什么最后的工作需要如此多的处理.无论状态大小如何,都会出现更高的处理时间,它只会影响持续时间.
这是mapWithState()功能中的错误还是这个预期的行为?底层数据结构是否需要某种重新洗牌,Set状态是否需要复制数据?或者它更可能是我的应用程序中的缺陷?
apache-spark ×10
spark-streaming ×10
scala ×4
apache-kafka ×2
amazon-s3 ×1
hadoop-yarn ×1
maven ×1
mongodb ×1
pyspark ×1
python ×1
rdd ×1