我更喜欢Python而不是Scala.但是,由于Spark本身是用Scala编写的,因此我希望我的代码在Scala中的运行速度比Python版本快,原因很明显.
有了这个假设,我想学习和编写一些非常常见的预处理代码的Scala版本,用于1 GB的数据.数据来自Kaggle的SpringLeaf比赛.只是为了概述数据(它包含1936个维度和145232行).数据由各种类型组成,例如int,float,string,boolean.我使用8个核心中的6个用于Spark处理; 这就是我使用的原因minPartitions=6,每个核心都有一些东西需要处理.
Scala代码
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = { …Run Code Online (Sandbox Code Playgroud) 假设我们有DataFrame,df包含以下列:
名称,姓氏,大小,宽度,长度,重量
现在我们想要执行几个操作,例如我们想要创建一些包含Size和Width数据的DataFrame.
val df1 = df.groupBy("surname").agg( sum("size") )
val df2 = df.groupBy("surname").agg( sum("width") )
Run Code Online (Sandbox Code Playgroud)
正如您所注意到的,其他列(如Length)不会在任何地方使用.Spark是否足够聪明,可以在洗牌阶段之前丢弃多余的列,还是随身携带?威尔跑:
val dfBasic = df.select("surname", "size", "width")
Run Code Online (Sandbox Code Playgroud)
在分组之前以某种方式影响性能?
performance dataframe apache-spark apache-spark-sql apache-spark-dataset
此链接和其他链接groupByKey告诉我,如果有大量密钥,则不应使用Spark ,因为 Spark 会打乱所有密钥。这同样适用于groupBy函数吗?或者这是不同的东西?
我问这个问题是因为我想做这个问题试图做的事情,但我有大量的钥匙。应该可以在不通过本地减少每个节点来打乱所有数据的情况下完成此操作,但我找不到 PySpark 的方法来执行此操作(坦率地说,我发现文档非常缺乏)。
本质上,我想做的是:
# Non-working pseudocode
df.groupBy("A").reduce(lambda x,y: if (x.TotalValue > y.TotalValue) x else y)
Run Code Online (Sandbox Code Playgroud)
然而,dataframe API 不提供“reduce”选项。我可能误解了 dataframe 到底想要实现什么。