有人可以向我解释一下map和flatMap之间的区别以及每个的好用例是什么?
什么"压扁结果"是什么意思?到底有什么好处呢?
我只是想知道Apache Spark中的RDD和DataFrame (Spark 2.0.0 DataFrame只是一个类型别名Dataset[Row])有什么区别?
你能把一个转换成另一个吗?
在闭包之外调用函数时会出现奇怪的行为:
任务不可序列化:java.io.NotSerializableException:testing
问题是我需要在类中的代码而不是对象.知道为什么会这样吗?Scala对象是否已序列化(默认?)?
这是一个有效的代码示例:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
Run Code Online (Sandbox Code Playgroud)
这是一个非工作的例子:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) …Run Code Online (Sandbox Code Playgroud) 我的集群:1个主服务器,11个从服务器,每个节点有6 GB内存.
我的设置:
spark.executor.memory=4g, Dspark.akka.frameSize=512
Run Code Online (Sandbox Code Playgroud)
这是问题所在:
首先,我从HDFS到RDD读取了一些数据(2.19 GB):
val imageBundleRDD = sc.newAPIHadoopFile(...)
Run Code Online (Sandbox Code Playgroud)
其次,在这个RDD上做点什么:
val res = imageBundleRDD.map(data => {
val desPoints = threeDReconstruction(data._2, bg)
(data._1, desPoints)
})
Run Code Online (Sandbox Code Playgroud)
最后,输出到HDFS:
res.saveAsNewAPIHadoopFile(...)
Run Code Online (Sandbox Code Playgroud)
当我运行我的程序时,它显示:
.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO …Run Code Online (Sandbox Code Playgroud) 根据Learning Spark的说法
请记住,重新分区数据是一项相当昂贵的操作.Spark还有一个优化版本的repartition(),称为coalesce(),它允许避免数据移动,但前提是你减少了RDD分区的数量.
我得到的一个区别是,使用repartition()可以增加/减少分区数量,但是使用coalesce()时,只能减少分区数量.
如果分区分布在多台机器上并运行coalesce(),它如何避免数据移动?
我阅读了Cluster Mode Overview,但仍然无法理解Spark Standalone集群中的不同进程和并行性.
工作者是否是JVM进程?我跑了bin\start-slave.sh,发现它产生了工人,实际上是一个JVM.
根据上面的链接,执行程序是为运行任务的工作节点上的应用程序启动的进程.Executor也是一个JVM.
这些是我的问题:
执行者是每个应用程序.那么工人的角色是什么?它是否与执行人协调并将结果传达给司机?或者司机是否与执行人直接对话?如果是这样,那么工人的目的是什么呢?
如何控制应用程序的执行程序数量?3.可以在执行程序内并行执行任务吗?如果是这样,如何配置执行程序的线程数?
工作者,执行者和执行者核心(--total-executor-cores)之间的关系是什么?
每个节点拥有更多工人意味着什么?
更新
让我们举个例子来更好地理解.
示例1: 具有5个工作节点的独立群集(每个节点具有8个核心)当我使用默认设置启动应用程序时.
示例2 与示例1相同的集群配置,但我运行具有以下设置的应用程序--executor-cores 10 --total-executor-cores 10.
示例3 与示例1相同的集群配置,但我运行具有以下设置的应用程序--executor-cores 10 --total-executor-cores 50.
示例4 与示例1相同的集群配置,但我运行具有以下设置的应用程序--executor-cores 50 --total-executor-cores 50.
示例5 与示例1相同的集群配置,但我运行具有以下设置的应用程序--executor-cores 50 --total-executor-cores 10.
在每个例子中,有多少执行者?每个执行程序有多少个线程?多少个核心?如何根据申请决定执行人数.它总是与工人数量相同吗?
在RDD持久性方面,spark cache()和persist()spark 之间有什么区别?
我试图了解在YARN上运行Spark作业时内核数量和执行程序数量之间的关系.
测试环境如下:
网络:1Gb
Spark版本:1.0.0
Hadoop版本:2.4.0(Hortonworks HDP 2.1)
Spark作业流程:sc.textFile - > filter - > map - > filter - > mapToPair - > reduceByKey - > map - > saveAsTextFile
输入数据
产量
该作业使用以下配置运行:
--master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3 (每个数据节点的执行程序,使用尽可能多的核心)
--master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3 (核心数量减少)
--master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12 (少核心,更多执行者)
经过的时间:
50分15秒
55分48秒
31分23秒
令我惊讶的是,(3)更快. …
我想从hdfs位置读取一堆文本文件,并使用spark在迭代中对其执行映射.
JavaRDD<String> records = ctx.textFile(args[1], 1); 能够一次只读取一个文件.
我想读取多个文件并将它们作为单个RDD处理.怎么样?
我想停止火花壳上的各种消息.
我试图编辑该log4j.properties文件以阻止这些消息.
这是内容 log4j.properties
# Define the root logger with appender file
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
Run Code Online (Sandbox Code Playgroud)
但是消息仍在控制台上显示.
以下是一些示例消息
15/01/05 15:11:45 INFO SparkEnv: Registering BlockManagerMaster
15/01/05 15:11:45 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150105151145-b1ba
15/01/05 15:11:45 INFO MemoryStore: MemoryStore started with capacity 0.0 B.
15/01/05 15:11:45 INFO ConnectionManager: Bound socket to port 44728 with id = ConnectionManagerId(192.168.100.85,44728)
15/01/05 …Run Code Online (Sandbox Code Playgroud) apache-spark ×10
rdd ×3
hadoop ×1
hadoop-yarn ×1
log4j ×1
scala ×1
spark-submit ×1
typesafe ×1