Scala与Python的Spark性能

Mri*_*jay 166 performance scala apache-spark rdd pyspark

我更喜欢Python而不是Scala.但是,由于Spark本身是用Scala编写的,因此我希望我的代码在Scala中的运行速度比Python版本快,原因很明显.

有了这个假设,我想学习和编写一些非常常见的预处理代码的Scala版本,用于1 GB的数据.数据来自Kaggle的SpringLeaf比赛.只是为了概述数据(它包含1936个维度和145232行).数据由各种类型组成,例如int,float,string,boolean.我使用8个核心中的6个用于Spark处理; 这就是我使用的原因minPartitions=6,每个核心都有一些东西需要处理.

Scala代码

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")
Run Code Online (Sandbox Code Playgroud)

Python代码

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
Run Code Online (Sandbox Code Playgroud)

斯卡拉表演 第0阶段(38分钟),第1阶段(18秒) 在此输入图像描述

Python Performance Stage 0(11分钟),第1阶段(7秒) 在此输入图像描述

两者都产生不同的DAG可视化图形(由于这两个图像显示Scala(map)和Python(reduceByKey)的不同阶段0函数

但是,基本上两个代码都试图将数据转换为(dimension_id,值列表的字符串)RDD并保存到磁盘.输出将用于计算每个维度的各种统计信息.

性能方面,像这样的真实数据的Scala代码似乎比Python版本慢4倍.对我来说好消息是它给了我留住Python的良好动力.坏消息是我不太明白为什么?

zer*_*323 328


讨论代码的原始答​​案可以在下面找到.


首先,您必须区分不同类型的API,每种API都有自己的性能考虑因素.

RDD API

(基于JVM的编排的纯Python结构)

这是受Python代码性能和PySpark实现细节影响最大的组件.虽然Python性能不太可能成为问题,但至少有几个因素需要考虑:

  • JVM通信的开销.实际上,来自Python执行器的所有数据都必须通过套接字和JVM工作程序传递.虽然这是一种相对有效的本地通信,但它仍然不是免费的.
  • 基于进程的执行程序(Python)与基于线程(单个JVM多线程)执行程序(Scala).每个Python执行器都在自己的进程中运行.作为一个副作用,它提供了比JVM对应物更强的隔离能力以及对执行程序生命周期的一些控制,但可能显着提高内存使用率:

    • 解释器内存占用
    • 已加载库的占用空间
    • 效率较低的广播(每个过程都需要自己的广播副本)
  • Python代码本身的性能.一般来说,Scala比Python快,但它会因任务而异.此外,您有多个选项,包括像即时编译器Numba,C扩展(用Cython)或专业图书馆像Theano.最后,如果您不使用ML/MLlib(或简称NumPy堆栈),请考虑使用PyPy作为替代解释器.见SPARK-3094.

  • PySpark配置提供了一个spark.python.worker.reuse选项,可用于在为每个任务分配Python进程和重用现有进程之间进行选择.后一种选择似乎有助于避免昂贵的垃圾收集(它比系统测试的结果更令人印象),而前一种(默认)对于昂贵的广播和进口是最佳的.
  • 引用计数,用作CPython中的第一行垃圾收集方法,与典型的Spark工作负载(类似流的处理,无引用周期)一起工作得很好,并降低了长GC暂停的风险.

MLlib

(混合Python和JVM执行)

基本考虑因素与之前的一些其他问题几乎相同.虽然与MLlib一起使用的基本结构是普通的Python RDD对象,但所有算法都是使用Scala直接执行的.

这意味着将Python对象转换为Scala对象的额外成本以及相反的方式,增加的内存使用量以及我们稍后将介绍的一些其他限制.

截至目前(Spark 2.x),基于RDD的API处于维护模式,计划在Spark 3.0中删除.

DataFrame API和Spark ML

(使用Python代码执行JVM仅限于驱动程序)

这些可能是标准数据处理任务的最佳选择.由于Python代码主要限于驱动程序上的高级逻辑操作,因此Python和Scala之间应该没有性能差异.

一个例外是使用行式Python UDF,它们的效率明显低于它们的Scala等价物.虽然有一些改进的机会(Spark 2.0.0已有大量开发),但最大的限制是内部表示(JVM)和Python解释器之间的完全往返.如果可能的话,你应该支持内置表达式的组合(例如 .Spin 2.0.0中的Python UDF行为已得到改进,但与本机执行相比,它仍然不是最理想的.将来可能会通过引入矢量化UDF来改进(SPARK-21190).

另外一定要避免在DataFrames和之间传递不必要的数据RDDs.这需要昂贵的序列化和反序列化,更不用说与Python解释器之间的数据传输.

值得注意的是Py4J调用具有相当高的延迟.这包括简单的调用,如:

from pyspark.sql.functions import col

col("foo")
Run Code Online (Sandbox Code Playgroud)

通常,它应该无关紧要(开销是恒定的并且不依赖于数据量)但是在软实时应用程序的情况下,您可以考虑缓存/重用Java包装器.

GraphX和Spark DataSet

至于现在(Spark 1.6 2.1),没有人提供PySpark API,所以你可以说PySpark比Scala更糟糕.

GraphX

在实践中,GraphX开发几乎完全停止,并且项目当前处于维护模式,相关的JIRA票据已关闭,因为无法修复.GraphFrames库提供了一个带有Python绑定的替代图形处理库.

数据集

主观上说,Datasets在Python中静态输入的地方并不多,即使当前的Scala实现过于简单,也没有提供与之相同的性能优势DataFrame.

从我到目前为止看到的,我强烈建议使用Scala而不是Python.如果PySpark获得对结构化流的支持,它可能会在未来发生变化,但现在Scala API似乎更加强大,全面和高效.我的经历非常有限.

Spark 2.x中的结构化流媒体似乎缩小了语言之间的差距,但目前它还处于早期阶段.尽管如此,基于RDD的API在Databricks文档(访问日期2017-03-03)中已被称为"遗留流",因此期望进一步统一工作是合理的.

非性能考虑因素

功能平价

并非所有Spark功能都通过PySpark API公开.务必检查您所需的部件是否已经实施,并尝试了解可能的限制.

使用MLlib和类似的混合上下文时尤其重要(请参阅从任务调用Java/Scala函数).公平地说,PySpark API的某些部分mllib.linalg提供了比Scala更全面的方法.

API设计

PySpark API密切反映了它的Scala对应物,因此不完全是Pythonic.这意味着在语言之间进行映射非常容易,但与此同时,Python代码可能更难以理解.

复杂的架构

与纯JVM执行相比,PySpark数据流相对复杂.有关PySpark程序或调试的理由要困难得多.此外,至少对Scala和JVM的基本理解是必不可少的.

Spark 2.x及以上

正在逐步转向DatasetAPI,使用冻结的RDD API为Python用户带来了机遇和挑战.虽然API的高级部分更容易在Python中公开,但更高级的功能几乎不可能直接使用.

此外,本机Python函数仍然是SQL世界中的二等公民.希望这将在未来使用Apache Arrow序列化进行改进(目前的努力目标数据,collection但UDF serde是一个长期目标).

对于强烈依赖于Python代码库的项目,纯Python替代品(如DaskRay)可能是一个有趣的替代方案.

它不一定是一对一

Spark DataFrame(SQL,Dataset)API提供了一种在PySpark应用程序中集成Scala/Java代码的优雅方法.您可以使用DataFrames将数据公开给本机JVM代码并回读结果.我已经在其他地方解释了一些选项,你可以在Pyspark中找到如何使用Scala类的Python-Scala往返的工作示例.

可以通过引入用户定义类型来进一步扩充(请参阅如何在Spark SQL中为自定义类型定义模式?).


问题中提供的代码有什么问题

(免责声明:Pythonista的观点.很可能我错过了一些Scala技巧)

首先,代码中有一部分根本没有意义.如果您已经(key, value)创建了使用的对,zipWithIndex或者enumerate创建字符串的重点是什么,只是为了之后将其拆分?flatMap不会递归地工作,所以你可以简单地产生元组并跳过map任何跟随.

另一个我觉得有问题的部分是reduceByKey.一般来说,reduceByKey如果应用聚合函数可以减少必须洗牌的数据量,则非常有用.由于你只是连接字符串,所以没有什么可以获得的.忽略低级别的东西,比如引用的数量,你必须传输的数据量完全相同groupByKey.

通常我不会纠缠于此,但据我所知,它是您的Scala代码中的瓶颈.在JVM上连接字符串是一项相当昂贵的操作(例如,参见:scala中的字符串连接与Java中的字符串连接一样昂贵吗?).这意味着像这样的东西在你的代码中是_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) 等价的input4.reduceByKey(valsConcat)并不是一个好主意.

如果你想避免groupByKey你可以尝试使用aggregateByKeyStringBuilder.类似的东西应该做的伎俩:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)
Run Code Online (Sandbox Code Playgroud)

但我怀疑这值得大惊小怪.

记住以上几点,我已经重写了你的代码如下:

斯卡拉:

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")
Run Code Online (Sandbox Code Playgroud)

Python:

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")
Run Code Online (Sandbox Code Playgroud)

结果

local[6]模式(Intel(R)Xeon(R)CPU E3-1245 V2 @ 3.40GHz)中,每个执行器有4GB内存(n = 3):

  • Scala - 意思是:250.00s,stdev:12.49
  • Python - 意思是:246.66s,stdev:1.15

我很确定大部分时间花在洗牌,序列化,反序列化和其他辅助任务上.只是为了好玩,这里是Python中的天真单线程代码,可以在不到一分钟的时间内在这台机器上执行相同的任务:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])
Run Code Online (Sandbox Code Playgroud)

  • 我遇到过一段时间内最清晰,最全面,最有用的答案之一.谢谢! (15认同)

归档时间:

查看次数:

41428 次

最近记录:

6 年,1 月 前