RDD API与UDF混合使用DataFrame API的性能影响

max*_*max 3 performance scala apache-spark rdd apache-spark-sql

(Scala特定的问题.)

虽然Spark文档鼓励尽可能使用DataFrame API,但如果DataFrame API不足,则通常选择回退到RDD API或使用UDF.这两种选择之间是否存在固有的性能差异?

RDD和UDF类似,因为它们都不能从Catalyst和Tungsten优化中受益.是否还有其他开销,如果存在,两种方法之间是否存在差异?

举一个具体的例子,假设我有一个DataFrame,它包含一列带有自定义格式的文本数据(不适合regexp匹配).我需要解析该列并添加一个包含结果标记的新向量列.

zer*_*323 11

他们都不能从Catalyst和Tungsten优化中受益

这不完全正确.虽然UDF没有从Tungsten优化中受益(可以说简单的SQL转换也没有得到巨大的推动),但您仍然可以从Catalyst提供的执行计划优化中受益.让我们用一个简单的例子来说明(注意:Spark 2.0和Scala.不要将它推断到早期版本,特别是使用PySpark):

val f = udf((x: String) => x == "a")
val g = udf((x: Int) => x + 1)

val df = Seq(("a", 1), ("b", 2)).toDF

df
  .groupBy($"_1")
  .agg(sum($"_2").as("_2"))
  .where(f($"_1"))
  .withColumn("_2", g($"_2"))
  .select($"_1")
  .explain

// == Physical Plan ==
// *HashAggregate(keys=[_1#2], functions=[])
// +- Exchange hashpartitioning(_1#2, 200)
//    +- *HashAggregate(keys=[_1#2], functions=[])
//       +- *Project [_1#2]
//          +- *Filter UDF(_1#2)
//             +- LocalTableScan [_1#2, _2#3]
Run Code Online (Sandbox Code Playgroud)

执行计划向我们展示了几件事:

  • 在聚合之前已经推下了选择.
  • 在聚合之前已经按下了预测并且有效地删除了第二个UDF调用.

根据数据和管道,这几乎可以免费提供显着的性能提升.

据说RDD和UDF都要求安全和不安全之间的迁移,而后者的灵活性要低得多.尽管如此,如果你需要的只是一个map类似于简单的行为而没有初始化昂贵的对象(比如数据库连接)那么UDF就是你要走的路.

在稍微复杂的场景中,当您真正需要访问某些低级功能(如自定义分区)时,您可以轻松地下拉到通用Dataset和保留RDDs.