Mri*_*jay 166 performance scala apache-spark rdd pyspark
我更喜欢Python而不是Scala.但是,由于Spark本身是用Scala编写的,因此我希望我的代码在Scala中的运行速度比Python版本快,原因很明显.
有了这个假设,我想学习和编写一些非常常见的预处理代码的Scala版本,用于1 GB的数据.数据来自Kaggle的SpringLeaf比赛.只是为了概述数据(它包含1936个维度和145232行).数据由各种类型组成,例如int,float,string,boolean.我使用8个核心中的6个用于Spark处理; 这就是我使用的原因minPartitions=6,每个核心都有一些东西需要处理.
Scala代码
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
Run Code Online (Sandbox Code Playgroud)
Python代码
input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + ',' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
Run Code Online (Sandbox Code Playgroud)
Python Performance
Stage 0(11分钟),第1阶段(7秒)

两者都产生不同的DAG可视化图形(由于这两个图像显示Scala(map)和Python(reduceByKey)的不同阶段0函数
但是,基本上两个代码都试图将数据转换为(dimension_id,值列表的字符串)RDD并保存到磁盘.输出将用于计算每个维度的各种统计信息.
性能方面,像这样的真实数据的Scala代码似乎比Python版本慢4倍.对我来说好消息是它给了我留住Python的良好动力.坏消息是我不太明白为什么?
zer*_*323 328
讨论代码的原始答案可以在下面找到.
首先,您必须区分不同类型的API,每种API都有自己的性能考虑因素.
(基于JVM的编排的纯Python结构)
这是受Python代码性能和PySpark实现细节影响最大的组件.虽然Python性能不太可能成为问题,但至少有几个因素需要考虑:
基于进程的执行程序(Python)与基于线程(单个JVM多线程)执行程序(Scala).每个Python执行器都在自己的进程中运行.作为一个副作用,它提供了比JVM对应物更强的隔离能力以及对执行程序生命周期的一些控制,但可能显着提高内存使用率:
Python代码本身的性能.一般来说,Scala比Python快,但它会因任务而异.此外,您有多个选项,包括像即时编译器Numba,C扩展(用Cython)或专业图书馆像Theano.最后,如果您不使用ML/MLlib(或简称NumPy堆栈),请考虑使用PyPy作为替代解释器.见SPARK-3094.
spark.python.worker.reuse选项,可用于在为每个任务分配Python进程和重用现有进程之间进行选择.后一种选择似乎有助于避免昂贵的垃圾收集(它比系统测试的结果更令人印象),而前一种(默认)对于昂贵的广播和进口是最佳的.(混合Python和JVM执行)
基本考虑因素与之前的一些其他问题几乎相同.虽然与MLlib一起使用的基本结构是普通的Python RDD对象,但所有算法都是使用Scala直接执行的.
这意味着将Python对象转换为Scala对象的额外成本以及相反的方式,增加的内存使用量以及我们稍后将介绍的一些其他限制.
截至目前(Spark 2.x),基于RDD的API处于维护模式,计划在Spark 3.0中删除.
(使用Python代码执行JVM仅限于驱动程序)
这些可能是标准数据处理任务的最佳选择.由于Python代码主要限于驱动程序上的高级逻辑操作,因此Python和Scala之间应该没有性能差异.
一个例外是使用行式Python UDF,它们的效率明显低于它们的Scala等价物.虽然有一些改进的机会(Spark 2.0.0已有大量开发),但最大的限制是内部表示(JVM)和Python解释器之间的完全往返.如果可能的话,你应该支持内置表达式的组合(例如 .Spin 2.0.0中的Python UDF行为已得到改进,但与本机执行相比,它仍然不是最理想的.将来可能会通过引入矢量化UDF来改进(SPARK-21190).
另外一定要避免在DataFrames和之间传递不必要的数据RDDs.这需要昂贵的序列化和反序列化,更不用说与Python解释器之间的数据传输.
值得注意的是Py4J调用具有相当高的延迟.这包括简单的调用,如:
from pyspark.sql.functions import col
col("foo")
Run Code Online (Sandbox Code Playgroud)
通常,它应该无关紧要(开销是恒定的并且不依赖于数据量)但是在软实时应用程序的情况下,您可以考虑缓存/重用Java包装器.
至于现在(Spark 1.6 2.1),没有人提供PySpark API,所以你可以说PySpark比Scala更糟糕.
在实践中,GraphX开发几乎完全停止,并且项目当前处于维护模式,相关的JIRA票据已关闭,因为无法修复.GraphFrames库提供了一个带有Python绑定的替代图形处理库.
数据集主观上说,Datasets在Python中静态输入的地方并不多,即使当前的Scala实现过于简单,也没有提供与之相同的性能优势DataFrame.
从我到目前为止看到的,我强烈建议使用Scala而不是Python.如果PySpark获得对结构化流的支持,它可能会在未来发生变化,但现在Scala API似乎更加强大,全面和高效.我的经历非常有限.
Spark 2.x中的结构化流媒体似乎缩小了语言之间的差距,但目前它还处于早期阶段.尽管如此,基于RDD的API在Databricks文档(访问日期2017-03-03)中已被称为"遗留流",因此期望进一步统一工作是合理的.
并非所有Spark功能都通过PySpark API公开.务必检查您所需的部件是否已经实施,并尝试了解可能的限制.
使用MLlib和类似的混合上下文时尤其重要(请参阅从任务调用Java/Scala函数).公平地说,PySpark API的某些部分mllib.linalg提供了比Scala更全面的方法.
PySpark API密切反映了它的Scala对应物,因此不完全是Pythonic.这意味着在语言之间进行映射非常容易,但与此同时,Python代码可能更难以理解.
复杂的架构与纯JVM执行相比,PySpark数据流相对复杂.有关PySpark程序或调试的理由要困难得多.此外,至少对Scala和JVM的基本理解是必不可少的.
Spark 2.x及以上正在逐步转向DatasetAPI,使用冻结的RDD API为Python用户带来了机遇和挑战.虽然API的高级部分更容易在Python中公开,但更高级的功能几乎不可能直接使用.
此外,本机Python函数仍然是SQL世界中的二等公民.希望这将在未来使用Apache Arrow序列化进行改进(目前的努力目标数据,collection但UDF serde是一个长期目标).
对于强烈依赖于Python代码库的项目,纯Python替代品(如Dask或Ray)可能是一个有趣的替代方案.
Spark DataFrame(SQL,Dataset)API提供了一种在PySpark应用程序中集成Scala/Java代码的优雅方法.您可以使用DataFrames将数据公开给本机JVM代码并回读结果.我已经在其他地方解释了一些选项,你可以在Pyspark中找到如何使用Scala类的Python-Scala往返的工作示例.
可以通过引入用户定义类型来进一步扩充(请参阅如何在Spark SQL中为自定义类型定义模式?).
(免责声明:Pythonista的观点.很可能我错过了一些Scala技巧)
首先,代码中有一部分根本没有意义.如果您已经(key, value)创建了使用的对,zipWithIndex或者enumerate创建字符串的重点是什么,只是为了之后将其拆分?flatMap不会递归地工作,所以你可以简单地产生元组并跳过map任何跟随.
另一个我觉得有问题的部分是reduceByKey.一般来说,reduceByKey如果应用聚合函数可以减少必须洗牌的数据量,则非常有用.由于你只是连接字符串,所以没有什么可以获得的.忽略低级别的东西,比如引用的数量,你必须传输的数据量完全相同groupByKey.
通常我不会纠缠于此,但据我所知,它是您的Scala代码中的瓶颈.在JVM上连接字符串是一项相当昂贵的操作(例如,参见:scala中的字符串连接与Java中的字符串连接一样昂贵吗?).这意味着像这样的东西在你的代码中是_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) 等价的input4.reduceByKey(valsConcat)并不是一个好主意.
如果你想避免groupByKey你可以尝试使用aggregateByKey带StringBuilder.类似的东西应该做的伎俩:
rdd.aggregateByKey(new StringBuilder)(
(acc, e) => {
if(!acc.isEmpty) acc.append(",").append(e)
else acc.append(e)
},
(acc1, acc2) => {
if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2)
else acc1.append(",").addString(acc2)
}
)
Run Code Online (Sandbox Code Playgroud)
但我怀疑这值得大惊小怪.
记住以上几点,我已经重写了你的代码如下:
斯卡拉:
val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
case ("true", i) => (i, "1")
case ("false", i) => (i, "0")
case p => p.swap
})
val result = pairs.groupByKey.map{
case (k, vals) => {
val valsString = vals.mkString(",")
s"$k,$valsString"
}
}
result.saveAsTextFile("scalaout")
Run Code Online (Sandbox Code Playgroud)
Python:
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
for (i, x) in enumerate(vals):
yield (i, x)
input = (sc
.textFile('train.csv', minPartitions=6)
.mapPartitionsWithIndex(drop_first_line))
pairs = input.flatMap(separate_cols)
result = (pairs
.groupByKey()
.map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))
result.saveAsTextFile("pythonout")
Run Code Online (Sandbox Code Playgroud)
在local[6]模式(Intel(R)Xeon(R)CPU E3-1245 V2 @ 3.40GHz)中,每个执行器有4GB内存(n = 3):
我很确定大部分时间花在洗牌,序列化,反序列化和其他辅助任务上.只是为了好玩,这里是Python中的天真单线程代码,可以在不到一分钟的时间内在这台机器上执行相同的任务:
def go():
with open("train.csv") as fr:
lines = [
line.replace('true', '1').replace('false', '0').split(",")
for line in fr]
return zip(*lines[1:])
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
41428 次 |
| 最近记录: |