azure pyspark从jar注册udf失败UDFRegistration

bil*_*St3 5 azure apache-spark pyspark databricks azure-databricks

我在注册 java 文件中的一些 udf 时遇到问题。我有几种方法,但它们都会返回:

无法执行用户定义的函数(UDFRegistration$$Lambda$6068/1550981127: (double, double) => double)

首先我尝试了这种方法:

from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
conf=SparkConf()
conf.set('spark.driver.extraClassPath', 'dbfs:/FileStore/jars/4b129434_12cd_4f2a_ab27_baaefe904857-scala_udf_similarity_0_0_7-35e3b.jar')
conf.set('spark.jars', 'dbfs:/FileStore/jars/4b129434_12cd_4f2a_ab27_baaefe904857-scala_udf_similarity_0_0_7-35e3b.jar')

spark = SparkSession(sc)
sc = SparkContext.getOrCreate(conf=conf)
#spark.sparkContext.addPyFile("dbfs:/FileStore/jars/4b129434_12cd_4f2a_ab27_baaefe904857-scala_udf_similarity_0_0_7-35e3b.jar")
udfs = [
    ('jaro_winkler_sim', 'JaroWinklerSimilarity',DoubleType()),
    ('jaccard_sim', 'JaccardSimilarity',DoubleType()),
    ('cosine_distance', 'CosineDistance',DoubleType()),
    ('Dmetaphone', 'DoubleMetaphone',StringType()),
    ('QgramTokeniser', 'QgramTokeniser',StringType())
]
for a,b,c in udfs:
    spark.udf.registerJavaFunction(a, 'uk.gov.moj.dash.linkage.'+ b, c)

linker = Splink(settings, spark, df_l=df_l, df_r=df_r)
df_e = linker.get_scored_comparisons()
Run Code Online (Sandbox Code Playgroud)

接下来我尝试将 jar 和 extraClassPath 移动到集群配置。

spark.jars dbfs:/FileStore/jars/4b129434_12cd_4f2a_ab27_baaefe904857-scala_udf_similarity_0_0_7-35e3b.jar
spark.driver.extraClassPath dbfs:/FileStore/jars/4b129434_12cd_4f2a_ab27_baaefe904857-scala_udf_similarity_0_0_7-35e3b.jar
Run Code Online (Sandbox Code Playgroud)

我将它们注册到我的脚本中,如下所示:

from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession, udf
from pyspark.sql.types import *
# java path to class uk.gov.moj.dash.linkage.scala-udf-similarity.CosineDistance

udfs = [
     ('jaro_winkler_sim', 'JaroWinklerSimilarity',DoubleType()),
     ('jaccard_sim', 'JaccardSimilarity',DoubleType()),
     ('cosine_distance', 'CosineDistance',DoubleType()),
     ('Dmetaphone', 'DoubleMetaphone',StringType()),
     ('QgramTokeniser', 'QgramTokeniser',StringType())
 ]
for a,b,c in udfs:
     spark.udf.registerJavaFunction(a, 'uk.gov.moj.dash.linkage.'+ b, c)

linker = Splink(settings, spark, df_l=df_l, df_r=df_r)
df_e = linker.get_scored_comparisons()
Run Code Online (Sandbox Code Playgroud)

谢谢

Ale*_*Ott 5

查看UDF 的源代码,我发现它是用 Scala 2.11 编译的,并使用 Spark 2.2.0 作为基础。出现该错误的最可能原因是您将此 jar 与 DBR 7.x 一起使用,该 jar 使用 Scala 2.12 编译并基于与您的 jar 二进制不兼容的 Spark 3.x。您有以下选择:

  1. 使用 Scala 2.12 和 Spark 3.0 重新编译库
  2. 使用使用 Scala 2.11 和 Spark 2.4 的 DBR 6.4

PS 在 Databricks 上覆盖类路径有时可能很棘手,因此最好使用其他方法:

  1. 将 jar 作为库安装到集群中- 这可以通过 UI、REST API 或其他自动化(例如 terraform)来完成
  2. 使用 [init script][2] 将 jar 复制到 jar 的默认位置。在最简单的情况下,它可能如下所示:
#!/bin/bash
cp /dbfs/FileStore/jars/4b129434_12cd_4f2a_ab27_baaefe904857-scala_udf_similarity_0_0_7-35e3b.jar /databricks/jars/
Run Code Online (Sandbox Code Playgroud)