spark sql中的有状态udfs,或者如何在spark sql中获得mapPartitions的性能优势?

Vit*_*liy 5 optimization user-defined-functions apache-spark pyspark

在转换导致创建或加载昂贵资源的情况下(例如,对外部服务进行身份验证或创建数据库连接),在映射分区上使用映射可以显着提高性能。

mapPartition允许我们每个分区初始化一次昂贵的资源,而不是像标准map那样每行初始化一次。

但是如果我使用数据帧,我应用自定义转换的方式是通过指定用户定义的函数逐行操作 - 所以我失去了使用 mapPartitions 为每个块执行一次繁重工作的能力。

在 spark-sql/dataframe 中是否有解决方法?

更具体地说

我需要对一堆文档执行特征提取。我有一个输入文档并输出一个向量的函数。

计算本身涉及初始化与外部服务的连接。我不想或不需要为每个文档初始化它。这在规模上具有非平凡的开销。

zer*_*323 5

一般来说,您有以下三种选择:

  • 直接转换DataFrameRDD应用mapPartitions。由于您使用Python,udf您已经破坏了某些优化并付出了serde成本,并且使用RDD平均不会使情况变得更糟。
  • 延迟初始化所需的资源(另请参阅如何在 PySpark 中处理数据之前在所有 Spark 工作线程上运行函数?)。
  • 如果数据可以使用 Arrow 进行序列化,请使用矢量化pandas_udf(Spark 2.3 及更高版本)。不幸的是你不能直接使用它VectorUDT,所以你必须扩展向量并稍后折叠,所以这里的限制因素是向量的大小。此外,您还必须小心控制分区的大小。

请注意,使用UserDefinedFunctions可能需要将对象提升为非确定性变体。