为什么Scala和像Spark和Scalding这样的框架都有reduce和foldLeft?那么reduce和之间的区别是fold什么?
如何在单个作业中使用Spark写入依赖于键的多个输出.
相关:通过键Scalding Hadoop写入多个输出,一个MapReduce作业
例如
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)
Run Code Online (Sandbox Code Playgroud)
确保cat prefix/1是
a
b
Run Code Online (Sandbox Code Playgroud)
并cat prefix/2会
c
Run Code Online (Sandbox Code Playgroud)
编辑:我最近添加了一个新的答案,其中包括完整的导入,皮条客和压缩编解码器,请参阅/sf/answers/3228263111/,除了之前的答案,这可能会有所帮助.
当执行shuffle时,我的Spark工作失败并说"设备上没有剩余空间",但是当我运行df -h它时说我有剩余空间!为什么会发生这种情况,我该如何解决?
如何使用scala 2.10中的新反射模型从scala中的case类中提取字段值?例如,使用下面的内容并没有提取字段方法
def getMethods[T:TypeTag](t:T) = typeOf[T].members.collect {
case m:MethodSymbol => m
}
Run Code Online (Sandbox Code Playgroud)
我打算把它们抽进去
for {field <- fields} {
currentMirror.reflect(caseClass).reflectField(field).get
}
Run Code Online (Sandbox Code Playgroud) 如何更改Apache Spark Shell的执行程序内存(和其他配置)?
特别是当我启动它时,我想给spark-shell提供单元,比如-Dspark-cores-max = 12,这样我在spark shell中的作业就会使用那些配置设置.
在我的Spark工作的洗牌阶段,我得到"太多打开的文件".为什么我的工作开了这么多文件?我可以采取哪些措施来使我的工作取得成功.
所以感谢我试过的易于谷歌的博客:
import org.specs2.mutable.Specification
class SparkEngineSpecs extends Specification {
sequential
def setLogLevels(level: Level, loggers: Seq[String]): Map[String, Level] = loggers.map(loggerName => {
val logger = Logger.getLogger(loggerName)
val prevLevel = logger.getLevel
logger.setLevel(level)
loggerName -> prevLevel
}).toMap
setLogLevels(Level.WARN, Seq("spark", "org.eclipse.jetty", "akka"))
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Test Spark Engine"))
// ... my unit tests
Run Code Online (Sandbox Code Playgroud)
但不幸的是它不起作用,我仍然得到很多火花输出,例如:
14/12/02 12:01:56 INFO MemoryStore: Block broadcast_4 of size 4184 dropped from memory (free 583461216)
14/12/02 12:01:56 INFO ContextCleaner: Cleaned broadcast 4
14/12/02 12:01:56 INFO ContextCleaner: Cleaned shuffle 4
14/12/02 …Run Code Online (Sandbox Code Playgroud) 当减少可以使用的分区数量时coalesce,这很好,因为它不会导致混乱并且似乎立即工作(不需要额外的工作阶段).
我有时会反其道而行之,但会repartition引发一场洗牌.我认为,在几个月前,我居然通过了这方面的工作CoalescedRDD有balanceSlack = 1.0-所以会发生什么是它将把一个分区,这样生成的磁盘分区的位置,所有的同一节点(这么小的净IO)上.
这种功能在Hadoop中是自动的,只需调整分割大小即可.除非减少分区数量,否则它似乎不会在Spark中以这种方式工作.我认为解决方案可能是编写一个自定义分区器以及我们定义的自定义RDD getPreferredLocations......但我认为这样做是如此简单和常见,确实必须有一个直接的方法来做到这一点?
事情尝试:
.set("spark.default.parallelism", partitions)在我的SparkConf,并且在阅读镶木地板的情况下,我已经尝试过sqlContext.sql("set spark.sql.shuffle.partitions= ...,在1.0.0上导致错误并且不是我想要的,我希望分区号在所有类型的工作中改变,而不仅仅是洗牌.
要广播一个变量,使得一个变量在一个集群上的每个节点的内存中只发生一次,就能做到:val myVarBroadcasted = sc.broadcast(myVar)然后在RDD转换中检索它,如下所示:
myRdd.map(blar => {
val myVarRetrieved = myVarBroadcasted.value
// some code that uses it
}
.someAction
Run Code Online (Sandbox Code Playgroud)
但是现在假设我希望用新的广播变量执行更多操作 - 如果由于旧的广播变量而没有足够的堆空间怎么办?我想要一个像这样的功能
myVarBroadcasted.remove()
Run Code Online (Sandbox Code Playgroud)
现在我似乎找不到这样做的方法.
另外,一个非常相关的问题:广播变量在哪里?它们会进入总内存的缓存分数,还是只进入堆分数?
我想在Apache Spark中读取具有以下结构的文件.
628344092\t20070220\t200702\t2007\t2007.1370
Run Code Online (Sandbox Code Playgroud)
分隔符是\ t.如何在使用spark.read.csv()时实现这一点?
csv太大而无法使用pandas,因为读取此文件需要很长时间.有没有一种方法与之相似
pandas.read_csv(file, sep = '\t')
Run Code Online (Sandbox Code Playgroud)
非常感谢!
apache-spark ×8
scala ×6
case-class ×1
csv ×1
fold ×1
hadoop ×1
hdfs ×1
log4j ×1
output ×1
pyspark ×1
reduce ×1
reflection ×1
scala-2.10 ×1
scalding ×1