小编Dan*_*don的帖子

累加器何时真正可靠?

我想使用累加器来收集有关我在Spark作业上操作的数据的一些统计信息.理想情况下,我会在作业计算所需的转换时执行此操作,但由于Spark会在不同情况下重新计算任务,因此累加器不会反映真实的指标.以下是文档描述的方式:

对于仅在操作内执行的累加器更新,Spark保证每个任务对累加器的更新仅应用一次,即重新启动的任务不会更新该值.在转换中,用户应该知道,如果重新执行任务或作业阶段,则可以多次应用每个任务的更新.

这很令人困惑,因为大多数操作都不允许运行自定义代码(可以使用累加器),它们主要采用先前转换的结果(懒惰).文档还显示了这一点:

val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.
Run Code Online (Sandbox Code Playgroud)

但是,如果我们data.count()在最后添加,这将保证是正确的(没有重复)或不是吗?显然acc,不使用"仅内部动作",因为地图是一种转变.所以不应该保证.

另一方面,关于相关Jira门票的讨论谈论"结果任务"而不是"行动".例如这里这里.这似乎表明结果确实可以保证是正确的,因为我们在acc之前和行动之前使用,因此应该作为单个阶段计算.

我猜这个"结果任务"的概念与所涉及的操作类型有关,是包含一个动作的最后一个,就像在这个例子中一样,它显示了几个操作如何分成几个阶段(洋红色,从这里拍摄的图像):

将多个操作划分为多个紫色阶段的工作

所以假设,count()在该链的末尾的一个动作将是同一个最后阶段的一部分,我将保证在最后一个地图上使用的累加器不会包含任何重复项?

澄清这个问题会很棒!谢谢.

apache-spark

47
推荐指数
2
解决办法
1万
查看次数

在Spark中,在所有工作者上拥有静态对象的正确方法是什么?

我一直在看Spark的文档,它提到了这个:

Spark的API在很大程度上依赖于在驱动程序中传递函数以在集群上运行.有两种建议的方法可以做到这一点:

匿名函数语法,可用于短代码.全局单例对象中的静态方法.例如,您可以定义对象MyFunctions,然后传递MyFunctions.func1,如下所示:

object MyFunctions {   def func1(s: String): String = { ... } }

myRdd.map(MyFunctions.func1) 
Run Code Online (Sandbox Code Playgroud)

请注意,虽然也可以将引用传递给类实例中的方法(而不是单例对象),但这需要发送包含该类的对象以及方法.例如,考虑:

class MyClass {   
  def func1(s: String): String = { ... }   
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) } 
} 
Run Code Online (Sandbox Code Playgroud)

在这里,如果我们创建一个新的MyClass并在其上调用doStuff,那里的map会引用该MyClass实例的func1方法,因此需要将整个对象发送到集群.它类似于写作 rdd.map(x => this.func1(x)).

现在我的疑问是,如果你对单例对象(它应该等同于静态)有属性会发生什么.相同的例子有一个小的改动:

object MyClass {   
  val value = 1   
  def func1(s: String): String = { s + value }   
} 

myRdd.map(MyClass.func1) 
Run Code Online (Sandbox Code Playgroud)

所以该函数仍然是静态引用的,但是Spark尝试序列化所有引用的变量会走多远?它会序列化value还是会在远程工作者中再次初始化?

另外,这一切都在我在单例对象中有一些重型模型的上下文中,我想找到将它们序列化到工作者的正确方法,同时保持从单独的单例引用它们的能力,而不是将它们传递给跨深度函数调用堆栈的函数参数.

有关Spark序列化事物的内容/方式/时间的任何深入信息将不胜感激.

scala apache-spark

17
推荐指数
1
解决办法
9923
查看次数

为什么Spark尝试发送GetMapOutputStatuses时报告"与MapOutputTracker通信时出错"?

我正在使用Spark 1.3对大量数据进行聚合.这项工作包括4个步骤:

  1. 读取一个大的(1TB)序列文件(对应1天的数据)
  2. 过滤掉大部分内容并获得大约1GB的随机写入
  3. keyBy客户
  4. aggregateByKey()到为该客户构建配置文件的自定义结构,对应于每个客户的HashMap [Long,Float].Long键是唯一的,永远不会超过50K不同的条目.

我用这个配置运行它:

--name geo-extract-$1-askTimeout \
--executor-cores 8 \
--num-executors 100 \
--executor-memory 40g \
--driver-memory 4g \
--driver-cores 8 \
--conf 'spark.storage.memoryFraction=0.25' \
--conf 'spark.shuffle.memoryFraction=0.35' \
--conf 'spark.kryoserializer.buffer.max.mb=1024' \
--conf 'spark.akka.frameSize=1024' \
--conf 'spark.akka.timeout=200' \
--conf 'spark.akka.askTimeout=111' \
--master yarn-cluster \
Run Code Online (Sandbox Code Playgroud)

并收到此错误:

    org.apache.spark.SparkException: Error communicating with MapOutputTracker
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
        at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
        ...
    Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
        ... 21 more
    Caused by: java.util.concurrent.TimeoutException: Futures …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark-1.3

16
推荐指数
1
解决办法
4606
查看次数

gwt firefox getAbsoluteLeft()

在尝试相对于GWT中的另一个元素设置弹出窗口时,我看到了一种非常奇怪的行为.看起来设置弹出位置(一个独立的浮动元素)会改变我从getAbsoluteRight()或getAbsoluteLeft()这样的调用获得的答案,这个元素在页面上是静态的,并且不会在视觉上移动.

我添加了一些print语句来检查发生了什么,所以这里是代码:

System.out.println(item.td);
int position = item.td.getAbsoluteRight()-offsetWidth;
System.out.println("left/right:" + item.td.getAbsoluteLeft() + "/" + item.td.getAbsoluteRight() + ". sent:" + (item.td.getAbsoluteRight() - offsetWidth) + "=" + position);
popup.setPopupPosition(position, item.td.getAbsoluteBottom());
System.out.println("left/right:" + item.td.getAbsoluteLeft() + "/" + item.td.getAbsoluteRight() + ". sent:" + (item.td.getAbsoluteRight() - offsetWidth) + "=" + position);
popup.addStyleName("bigger");
System.out.println("left/right:" + item.td.getAbsoluteLeft() + "/" + item.td.getAbsoluteRight() + ". sent:" + (item.td.getAbsoluteRight() - offsetWidth) + "=" + position);
System.out.println(item.td);
Run Code Online (Sandbox Code Playgroud)

以下是Chrome上的结果

Menu displayed, widths: 81/340=340
<td class="hover">Daniel?</td>    
left/right:1104/1185. sent:845=845
left/right:1121/1202. sent:862=845
left/right:1121/1202. sent:862=845
<td class="hover">Daniel?</td> …
Run Code Online (Sandbox Code Playgroud)

gwt css-position

11
推荐指数
1
解决办法
565
查看次数

'spark.driver.maxResultSize'的范围

我正在运行Spark作业来聚合数据.我有一个名为Profile的自定义数据结构,它基本上包含一个mutable.HashMap[Zone, Double].我想使用以下代码合并共享给定密钥(UUID)的所有配置文件:

def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
  .aggregateByKey(new Profile(), 3200)(merge, merge).cache()
Run Code Online (Sandbox Code Playgroud)

奇怪的是,Spark失败并出现以下错误:

org.apache.spark.SparkException:作业因阶段失败而中止:116318任务的序列化结果总大小(1024.0 MB)大于spark.driver.maxResultSize(1024.0 MB)

显而易见的解决方案是增加"spark.driver.maxResultSize",但有两件事让我困惑.

  1. 我得到的1024.0大于1024.0太巧合了
  2. 我发现谷歌搜索此特定错误和配置参数的所有文档和帮助表明它会影响将值带回驱动程序的函数.(说take()或者collect()),但我没有把任何东西带到驱动程序,只是从HDFS读取,聚合,保存回HDFS.

有谁知道我为什么会收到这个错误?

scala apache-spark apache-spark-1.3

8
推荐指数
1
解决办法
2158
查看次数

在scala中省略多个行的花括号

我已经看到了一些scala代码示例,其中多行代码被用作没有花括号的代码块,例如:

x match {
  case a:Int =>
    val b = 1
    val c = b +3
    println("hello!")
    c
  case _ => 5
}
Run Code Online (Sandbox Code Playgroud)

与使用表单的隐式参数的一些非常长的函数相同:

a.map { implicit x =>
  // many, many complex lines of code
}
Run Code Online (Sandbox Code Playgroud)

而不是:

a.map { implicit x => {
  // many, many complex lines of code
}}
Run Code Online (Sandbox Code Playgroud)

我已经看过很多文档/ FAQ,说明多行代码应该总是用大括号包围,但是找不到这些异常的解释.我很想了解或有一个良好的直觉,所以对我来说不会感觉像魔术.

scala

7
推荐指数
1
解决办法
1408
查看次数

ggplot2:更改图例中的因子顺序

我有一个折线图,我想重新排序因子在图例中出现的方式.我试过了,scale_fill_discrete但它没有改变顺序.这是我的问题的模拟:

df <- data.frame(var1=c("F", "F", "F", "B", "B", "B"),
                 var2=c("levelB", "levelC", "levelA"),
                 value=c("2.487585", "2.535944", "3.444764", "2.917308", "2.954155","3.739049"))

p <- ggplot(data=df, aes(x=var1, y=value, 
  group=var2, colour=var2, shape = var2)) +
  geom_line(size = 0.8) +
  geom_point()+
  xlab("var1") + ylab("Value") +
  scale_x_discrete(limits=c("F","B")) + 
  theme(legend.title = element_text(size=12)) + 
  theme(legend.text = element_text(size=10)) +
  scale_fill_discrete(breaks=c("levelB","levelC","levelA")) +
  theme(title = element_text(size=12)) +
  blank + scale_color_manual(values=c("green2", "red", "black")) +
  theme(legend.key = element_blank())
p
Run Code Online (Sandbox Code Playgroud)

这创造了这个:

在此输入图像描述

我想一切都保持完全一致,除了传奇,在这里我想的顺序改变levelB,然后levelClevelA.我猜ggplot2是按字母顺序命令传说,我想覆盖它.重新排序我的数据框架不起作用,scale_fill_discrete也不会改变它.有任何想法吗?

谢谢!

r legend ggplot2

3
推荐指数
1
解决办法
2326
查看次数

方法与函数:意外的单位返回值

为什么下面的方法()在等效函数返回布尔值(如预期)时返回单位值(即)?

// aMethod(1) returns ()
def aMethod(a: Int) { true }

// aFunction(1) returns true
val aFunction = (a: Int) => true
Run Code Online (Sandbox Code Playgroud)

methods scala

2
推荐指数
2
解决办法
322
查看次数

放大而不缩放图形

我(使用java slick2D库特别,但可能不会是这个问题的一个因素),学习OpenGL的2D.我发现我可以改变推动一个新的变换矩阵,例如将世界坐标转换为屏幕(视图)坐标.

我想使用它来缩放我的视图,增加对象之间的距离,但是一旦完成,我想在屏幕坐标中绘制图像和形状,如HUD.基本上,我希望坐标改变,但图形保持不变,所以我可以在每个比例上渲染自定义图像.

这可以通过一些聪明的转换或一些OpenGL选项来完成,而不是手动计算所有坐标吗?

opengl transformation scale

0
推荐指数
1
解决办法
857
查看次数