让我帮助澄清一下深度洗牌以及Spark如何使用随机播放管理器.我报告了一些非常有用的资源:
https://trongkhoanguyenblog.wordpress.com/
https://0x0fff.com/spark-architecture-shuffle/
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md
读它们,我知道有不同的洗牌经理.我想关注其中两个:hash manager和sort manager(这是默认的管理器).
为了揭露我的问题,我想从一个非常普遍的转变开始:
val rdd = reduceByKey(_ + _)
Run Code Online (Sandbox Code Playgroud)
此转换导致映射端聚合,然后随机播放以将所有相同的密钥带入同一分区.
我的问题是:
Map-Side聚合是使用内部mapPartition转换实现的,因此使用组合器函数聚合所有相同的键,还是使用AppendOnlyMap或ExternalAppendOnlyMap?
如果AppendOnlyMap或者ExternalAppendOnlyMap地图用于聚合,它们是否也用于减少发生在 ResultTask?
这两种地图(AppendOnlyMap或ExternalAppendOnlyMap)的目的是什么?
是AppendOnlyMap或ExternalAppendOnlyMap所有洗牌经理,或只是从sortManager使用?
我读到之后AppendOnlyMap或ExternalAppendOnlyMap已经满了,都被泄漏到文件中,这个步骤到底是怎么回事?
使用Sort shuffle管理器,我们使用appendOnlyMap来聚合和组合分区记录,对吧?然后当执行内存填满时,我们开始排序映射,将其溢出到磁盘然后清理映射,我的问题是:溢出到磁盘和shuffle写入有什么区别?它们主要包括在本地文件系统上创建文件,但它们的处理方式不同,Shuffle写入记录,不会放入appendOnlyMap.
你能否深入解释当reduceByKey被执行时会发生什么,向我解释完成该任务所涉及的所有步骤?例如,地图边聚合,改组等所有步骤.
我越来越感到困惑spill to disk和shuffle write.使用默认的Sort shuffle管理器,我们使用appendOnlyMapfor聚合和组合分区记录,对吗?然后当执行内存填满时,我们开始排序地图,将其溢出到磁盘,然后清理地图以进行下一次泄漏(如果发生),我的问题是:
溢出到磁盘和shuffle写入有什么区别?它们主要包括在本地文件系统上创建文件以及记录.
承认是不同的,因此Spill记录被排序,因为它们通过地图传递,而不是随机写入记录,因为它们没有从地图传递.
谢谢.
乔治
Hy伙计们,
我正试图解决这个问题:
我有一个jsp页面,里面有一个用dispaytag库和其他类型的东西生成的表.
当我按下一个按钮时,我想生成一个ajax调用,只重新加载表而不是整个jsp页面,正确显示uptated表以及生成的标签pagebanner和pagelinks,增加和减少元素创建.
有没有解决这个问题的方案?
我正在尝试调整Spark应用程序,以减少总体执行时间,但我在Stage执行期间遇到了一个奇怪的行为.
基本上只有14/120任务需要大约20分钟完成,其他任务需要4或5分钟才能完成.
看一下Spark UI,分区似乎很好,我看到的唯一区别是14个任务的GC时间非常高.
我附上了情况的图像.
我在编写一个名为head的函数时遇到了问题,它基本上用另一个函数替换头元素,用于调用它的List:
List(1,2,3,4).head(4) // List(4,2,3,4)
Run Code Online (Sandbox Code Playgroud)
代码显然没用,我只是想和Scala一起玩.这是代码:
sealed trait List[+A]{
def tail():List[A]
def head[A](x:A):List[A]
}
object Nil extends List[Nothing]{
def tail() = throw new Exception("Nil couldn't has tail")
def head[A](x:A): List[A] = List(x)
}
case class Cons[+A](x :A, xs: List[A]) extends List[A]{
def tail():List[A] = xs
def head[A](a:A): List[A] = Cons(a,xs)
}
object List{
def apply[A](as:A*):List[A] = {
if (as.isEmpty) Nil
else Cons(as.head,apply(as.tail: _*))
}
}
Cons(1,Cons(2,Nil)) == List(1,2)
Cons(1,Cons(2,Cons(3,Cons(4,Nil)))).tail()
List(1,2,3,4,5,6,7).tail()
List(1,2,3,4).head(4)
Run Code Online (Sandbox Code Playgroud)
它没有编译,我有这个错误:
Error:(11, 39) type mismatch;
found : A$A318.this.List[A(in …Run Code Online (Sandbox Code Playgroud) 我将尝试使用称为URM的特定log4j记录器进行记录,该记录器将文件登录到scala类中,并且我还需要在随播对象中使用相同的记录器:
类
class MyClass{
...
val logger = Logger.getLogger("URMLogger")
logger.info("message log")
....
}
Run Code Online (Sandbox Code Playgroud)
伴随对象
object MyClass{
...
logger.info("message log")
...
}
Run Code Online (Sandbox Code Playgroud)
Log4j.properties
...
log4j.logger.URMLogger=DEBUG,URM
log4j.appender.URM=org.apache.log4j.FileAppender
log4j.appender.URM.File=target/URM.log
log4j.appender.URM.layout=org.apache.log4j.PatternLayout
log4j.appender.URM.layout.ConversionPattern=%d %p %t [%c] - %m%n
...
Run Code Online (Sandbox Code Playgroud)
如何将记录器初始化一次并在两个e类对象伴侣中使用它?
apache-spark ×3
rdd ×3
scala ×2
shuffle ×2
ajax ×1
displaytag ×1
javascript ×1
jquery ×1
log4j ×1
logging ×1
partitioning ×1
performance ×1
struts2 ×1