标签: apache-spark

map和flatMap有什么区别,每个用例都是一个很好的用例?

有人可以向我解释一下map和flatMap之间的区别以及每个的好用例是什么?

什么"压扁结果"是什么意思?到底有什么好处呢?

apache-spark

237
推荐指数
10
解决办法
15万
查看次数

Spark中的DataFrame,Dataset和RDD之间的区别

我只是想知道Apache Spark中的RDDDataFrame (Spark 2.0.0 DataFrame只是一个类型别名Dataset[Row])有什么区别?

你能把一个转换成另一个吗?

apache-spark rdd apache-spark-sql apache-spark-dataset

228
推荐指数
10
解决办法
10万
查看次数

任务不可序列化:java.io.NotSerializableException仅在类而不是对象上调用闭包外的函数时

在闭包之外调用函数时会出现奇怪的行为:

  • 当函数在一个对象中时,一切正常
  • 当函数在类中时获取:

任务不可序列化:java.io.NotSerializableException:testing

问题是我需要在类中的代码而不是对象.知道为什么会这样吗?Scala对象是否已序列化(默认?)?

这是一个有效的代码示例:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}
Run Code Online (Sandbox Code Playgroud)

这是一个非工作的例子:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) …
Run Code Online (Sandbox Code Playgroud)

serialization scala typesafe apache-spark

211
推荐指数
6
解决办法
15万
查看次数

Spark java.lang.OutOfMemoryError:Java堆空间

我的集群:1个主服务器,11个从服务器,每个节点有6 GB内存.

我的设置:

spark.executor.memory=4g, Dspark.akka.frameSize=512
Run Code Online (Sandbox Code Playgroud)

这是问题所在:

首先,我从HDFS到RDD读取了一些数据(2.19 GB):

val imageBundleRDD = sc.newAPIHadoopFile(...)
Run Code Online (Sandbox Code Playgroud)

其次,在这个RDD上做点什么:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })
Run Code Online (Sandbox Code Playgroud)

最后,输出到HDFS:

res.saveAsNewAPIHadoopFile(...)
Run Code Online (Sandbox Code Playgroud)

当我运行我的程序时,它显示:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO …
Run Code Online (Sandbox Code Playgroud)

out-of-memory apache-spark

208
推荐指数
9
解决办法
21万
查看次数

Spark - repartition()vs coalesce()

根据Learning Spark的说法

请记住,重新分区数据是一项相当昂贵的操作.Spark还有一个优化版本的repartition(),称为coalesce(),它允许避免数据移动,但前提是你减少了RDD分区的数量.

我得到的一个区别是,使用repartition()可以增加/减少分区数量,但是使用coalesce()时,只能减少分区数量.

如果分区分布在多台机器上并运行coalesce(),它如何避免数据移动?

distributed-computing apache-spark rdd

208
推荐指数
13
解决办法
15万
查看次数

Spark Standalone集群中的工作者,执行者和核心是什么?

我阅读了Cluster Mode Overview,但仍然无法理解Spark Standalone集群中的不同进程和并行性.

工作者是否是JVM进程?我跑了bin\start-slave.sh,发现它产生了工人,实际上是一个JVM.

根据上面的链接,执行程序是为运行任务的工作节点上的应用程序启动的进程.Executor也是一个JVM.

这些是我的问题:

  1. 执行者是每个应用程序.那么工人的角色是什么?它是否与执行人协调并将结果传达给司机?或者司机是否与执行人直接对话?如果是这样,那么工人的目的是什么呢?

  2. 如何控制应用程序的执行程序数量?3.可以在执行程序内并行执行任务吗?如果是这样,如何配置执行程序的线程数?

  3. 工作者,执行者和执行者核心(--total-executor-cores)之间的关系是什么?

  4. 每个节点拥有更多工人意味着什么?

更新

让我们举个例子来更好地理解.

示例1: 具有5个工作节点的独立群集(每个节点具有8个核心)当我使用默认设置启动应用程序时.

示例2 与示例1相同的集群配置,但我运行具有以下设置的应用程序--executor-cores 10 --total-executor-cores 10.

示例3 与示例1相同的集群配置,但我运行具有以下设置的应用程序--executor-cores 10 --total-executor-cores 50.

示例4 与示例1相同的集群配置,但我运行具有以下设置的应用程序--executor-cores 50 --total-executor-cores 50.

示例5 与示例1相同的集群配置,但我运行具有以下设置的应用程序--executor-cores 50 --total-executor-cores 10.

在每个例子中,有多少执行者?每个执行程序有多少个线程?多少个核心?如何根据申请决定执行人数.它总是与工人数量相同吗?

distributed-computing apache-spark

198
推荐指数
2
解决办法
7万
查看次数

缓存和持久有什么区别?

RDD持久性方面,spark cache()persist()spark 之间有什么区别?

distributed-computing apache-spark rdd

197
推荐指数
6
解决办法
9万
查看次数

Apache Spark:核心数与执行者数量

我试图了解在YARN上运行Spark作业时内核数量和执行程序数量之间的关系.

测试环境如下:

  • 数据节点数:3
  • 数据节点机器规格:
    • CPU:Core i7-4790(核心数:4,线程数:8)
    • 内存:32GB(8GB x 4)
    • 硬盘:8TB(2TB x 4)
  • 网络:1Gb

  • Spark版本:1.0.0

  • Hadoop版本:2.4.0(Hortonworks HDP 2.1)

  • Spark作业流程:sc.textFile - > filter - > map - > filter - > mapToPair - > reduceByKey - > map - > saveAsTextFile

  • 输入数据

    • 类型:单个文本文件
    • 尺寸:165GB
    • 行数:454,568,833
  • 产量

    • 第二次过滤后的行数:310,640,717
    • 结果文件的行数:99,848,268
    • 结果文件的大小:41GB

该作业使用以下配置运行:

  1. --master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3 (每个数据节点的执行程序,使用尽可能多的核心)

  2. --master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3 (核心数量减少)

  3. --master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12 (少核心,更多执行者)

经过的时间:

  1. 50分15秒

  2. 55分48秒

  3. 31分23秒

令我惊讶的是,(3)更快. …

hadoop hadoop-yarn apache-spark

178
推荐指数
5
解决办法
8万
查看次数

如何将多个文本文件读入单个RDD?

我想从hdfs位置读取一堆文本文件,并使用spark在迭代中对其执行映射.

JavaRDD<String> records = ctx.textFile(args[1], 1); 能够一次只读取一个文件.

我想读取多个文件并将它们作为单个RDD处理.怎么样?

apache-spark

171
推荐指数
4
解决办法
14万
查看次数

如何停止在火花控制台上显示INFO消息?

我想停止火花壳上的各种消息.

我试图编辑该log4j.properties文件以阻止这些消息.

这是内容 log4j.properties

# Define the root logger with appender file
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
Run Code Online (Sandbox Code Playgroud)

但是消息仍在控制台上显示.

以下是一些示例消息

15/01/05 15:11:45 INFO SparkEnv: Registering BlockManagerMaster
15/01/05 15:11:45 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150105151145-b1ba
15/01/05 15:11:45 INFO MemoryStore: MemoryStore started with capacity 0.0 B.
15/01/05 15:11:45 INFO ConnectionManager: Bound socket to port 44728 with id = ConnectionManagerId(192.168.100.85,44728)
15/01/05 …
Run Code Online (Sandbox Code Playgroud)

log4j apache-spark spark-submit

168
推荐指数
12
解决办法
16万
查看次数