我想使用累加器来收集有关我在Spark作业上操作的数据的一些统计信息.理想情况下,我会在作业计算所需的转换时执行此操作,但由于Spark会在不同情况下重新计算任务,因此累加器不会反映真实的指标.以下是文档描述的方式:
对于仅在操作内执行的累加器更新,Spark保证每个任务对累加器的更新仅应用一次,即重新启动的任务不会更新该值.在转换中,用户应该知道,如果重新执行任务或作业阶段,则可以多次应用每个任务的更新.
这很令人困惑,因为大多数操作都不允许运行自定义代码(可以使用累加器),它们主要采用先前转换的结果(懒惰).文档还显示了这一点:
val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.
Run Code Online (Sandbox Code Playgroud)
但是,如果我们data.count()在最后添加,这将保证是正确的(没有重复)或不是吗?显然acc,不使用"仅内部动作",因为地图是一种转变.所以不应该保证.
另一方面,关于相关Jira门票的讨论谈论"结果任务"而不是"行动".例如这里和这里.这似乎表明结果确实可以保证是正确的,因为我们在acc之前和行动之前使用,因此应该作为单个阶段计算.
我猜这个"结果任务"的概念与所涉及的操作类型有关,是包含一个动作的最后一个,就像在这个例子中一样,它显示了几个操作如何分成几个阶段(洋红色,从这里拍摄的图像):

所以假设,count()在该链的末尾的一个动作将是同一个最后阶段的一部分,我将保证在最后一个地图上使用的累加器不会包含任何重复项?
澄清这个问题会很棒!谢谢.
我一直在看Spark的文档,它提到了这个:
Spark的API在很大程度上依赖于在驱动程序中传递函数以在集群上运行.有两种建议的方法可以做到这一点:
匿名函数语法,可用于短代码.全局单例对象中的静态方法.例如,您可以定义对象MyFunctions,然后传递MyFunctions.func1,如下所示:
object MyFunctions { def func1(s: String): String = { ... } }
myRdd.map(MyFunctions.func1)
Run Code Online (Sandbox Code Playgroud)
请注意,虽然也可以将引用传递给类实例中的方法(而不是单例对象),但这需要发送包含该类的对象以及方法.例如,考虑:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
Run Code Online (Sandbox Code Playgroud)
在这里,如果我们创建一个新的MyClass并在其上调用doStuff,那里的map会引用该MyClass实例的func1方法,因此需要将整个对象发送到集群.它类似于写作
rdd.map(x => this.func1(x)).
现在我的疑问是,如果你对单例对象(它应该等同于静态)有属性会发生什么.相同的例子有一个小的改动:
object MyClass {
val value = 1
def func1(s: String): String = { s + value }
}
myRdd.map(MyClass.func1)
Run Code Online (Sandbox Code Playgroud)
所以该函数仍然是静态引用的,但是Spark尝试序列化所有引用的变量会走多远?它会序列化value还是会在远程工作者中再次初始化?
另外,这一切都在我在单例对象中有一些重型模型的上下文中,我想找到将它们序列化到工作者的正确方法,同时保持从单独的单例引用它们的能力,而不是将它们传递给跨深度函数调用堆栈的函数参数.
有关Spark序列化事物的内容/方式/时间的任何深入信息将不胜感激.
我正在使用Spark 1.3对大量数据进行聚合.这项工作包括4个步骤:
我用这个配置运行它:
--name geo-extract-$1-askTimeout \
--executor-cores 8 \
--num-executors 100 \
--executor-memory 40g \
--driver-memory 4g \
--driver-cores 8 \
--conf 'spark.storage.memoryFraction=0.25' \
--conf 'spark.shuffle.memoryFraction=0.35' \
--conf 'spark.kryoserializer.buffer.max.mb=1024' \
--conf 'spark.akka.frameSize=1024' \
--conf 'spark.akka.timeout=200' \
--conf 'spark.akka.askTimeout=111' \
--master yarn-cluster \
Run Code Online (Sandbox Code Playgroud)
并收到此错误:
org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
...
Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
... 21 more
Caused by: java.util.concurrent.TimeoutException: Futures …Run Code Online (Sandbox Code Playgroud) 在尝试相对于GWT中的另一个元素设置弹出窗口时,我看到了一种非常奇怪的行为.看起来设置弹出位置(一个独立的浮动元素)会改变我从getAbsoluteRight()或getAbsoluteLeft()这样的调用获得的答案,这个元素在页面上是静态的,并且不会在视觉上移动.
我添加了一些print语句来检查发生了什么,所以这里是代码:
System.out.println(item.td);
int position = item.td.getAbsoluteRight()-offsetWidth;
System.out.println("left/right:" + item.td.getAbsoluteLeft() + "/" + item.td.getAbsoluteRight() + ". sent:" + (item.td.getAbsoluteRight() - offsetWidth) + "=" + position);
popup.setPopupPosition(position, item.td.getAbsoluteBottom());
System.out.println("left/right:" + item.td.getAbsoluteLeft() + "/" + item.td.getAbsoluteRight() + ". sent:" + (item.td.getAbsoluteRight() - offsetWidth) + "=" + position);
popup.addStyleName("bigger");
System.out.println("left/right:" + item.td.getAbsoluteLeft() + "/" + item.td.getAbsoluteRight() + ". sent:" + (item.td.getAbsoluteRight() - offsetWidth) + "=" + position);
System.out.println(item.td);
Run Code Online (Sandbox Code Playgroud)
以下是Chrome上的结果
Menu displayed, widths: 81/340=340
<td class="hover">Daniel?</td>
left/right:1104/1185. sent:845=845
left/right:1121/1202. sent:862=845
left/right:1121/1202. sent:862=845
<td class="hover">Daniel?</td> …Run Code Online (Sandbox Code Playgroud) 我正在运行Spark作业来聚合数据.我有一个名为Profile的自定义数据结构,它基本上包含一个mutable.HashMap[Zone, Double].我想使用以下代码合并共享给定密钥(UUID)的所有配置文件:
def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
.aggregateByKey(new Profile(), 3200)(merge, merge).cache()
Run Code Online (Sandbox Code Playgroud)
奇怪的是,Spark失败并出现以下错误:
org.apache.spark.SparkException:作业因阶段失败而中止:116318任务的序列化结果总大小(1024.0 MB)大于spark.driver.maxResultSize(1024.0 MB)
显而易见的解决方案是增加"spark.driver.maxResultSize",但有两件事让我困惑.
take()或者collect()),但我没有把任何东西带到驱动程序,只是从HDFS读取,聚合,保存回HDFS.有谁知道我为什么会收到这个错误?
我已经看到了一些scala代码示例,其中多行代码被用作没有花括号的代码块,例如:
x match {
case a:Int =>
val b = 1
val c = b +3
println("hello!")
c
case _ => 5
}
Run Code Online (Sandbox Code Playgroud)
与使用表单的隐式参数的一些非常长的函数相同:
a.map { implicit x =>
// many, many complex lines of code
}
Run Code Online (Sandbox Code Playgroud)
而不是:
a.map { implicit x => {
// many, many complex lines of code
}}
Run Code Online (Sandbox Code Playgroud)
我已经看过很多文档/ FAQ,说明多行代码应该总是用大括号包围,但是找不到这些异常的解释.我很想了解或有一个良好的直觉,所以对我来说不会感觉像魔术.
我有一个折线图,我想重新排序因子在图例中出现的方式.我试过了,scale_fill_discrete但它没有改变顺序.这是我的问题的模拟:
df <- data.frame(var1=c("F", "F", "F", "B", "B", "B"),
var2=c("levelB", "levelC", "levelA"),
value=c("2.487585", "2.535944", "3.444764", "2.917308", "2.954155","3.739049"))
p <- ggplot(data=df, aes(x=var1, y=value,
group=var2, colour=var2, shape = var2)) +
geom_line(size = 0.8) +
geom_point()+
xlab("var1") + ylab("Value") +
scale_x_discrete(limits=c("F","B")) +
theme(legend.title = element_text(size=12)) +
theme(legend.text = element_text(size=10)) +
scale_fill_discrete(breaks=c("levelB","levelC","levelA")) +
theme(title = element_text(size=12)) +
blank + scale_color_manual(values=c("green2", "red", "black")) +
theme(legend.key = element_blank())
p
Run Code Online (Sandbox Code Playgroud)
这创造了这个:
我想一切都保持完全一致,除了传奇,在这里我想的顺序改变levelB,然后levelC再levelA.我猜ggplot2是按字母顺序命令传说,我想覆盖它.重新排序我的数据框架不起作用,scale_fill_discrete也不会改变它.有任何想法吗?
谢谢!
为什么下面的方法()在等效函数返回布尔值(如预期)时返回单位值(即)?
// aMethod(1) returns ()
def aMethod(a: Int) { true }
// aFunction(1) returns true
val aFunction = (a: Int) => true
Run Code Online (Sandbox Code Playgroud) 我(使用java slick2D库特别,但可能不会是这个问题的一个因素),学习OpenGL的2D.我发现我可以改变推动一个新的变换矩阵,例如将世界坐标转换为屏幕(视图)坐标.
我想使用它来缩放我的视图,增加对象之间的距离,但是一旦完成,我想在屏幕坐标中绘制图像和形状,如HUD.基本上,我希望坐标改变,但图形保持不变,所以我可以在每个比例上渲染自定义图像.
这可以通过一些聪明的转换或一些OpenGL选项来完成,而不是手动计算所有坐标吗?
scala ×5
apache-spark ×3
css-position ×1
ggplot2 ×1
gwt ×1
legend ×1
methods ×1
opengl ×1
r ×1
scale ×1