无法实例化 Azure Databricks 的 EventHubSourceProvider

KMM*_*KMM 5 azure-eventhub pyspark azure-databricks

使用结构化流 pyspark中记录的步骤,我无法从我设置的 Azure 事件中心在pyspark中创建数据帧以读取流数据。

错误消息是: java.util.ServiceConfigurationError:org.apache.spark.sql.sources.DataSourceRegister:无法实例化提供程序org.apache.spark.sql.eventhubs.EventHubsSourceProvider

我已经安装了 Maven 库(com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.12 不可用),但似乎没有一个可以工作:com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.15 com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.6

ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)返回的错误消息是:

java.lang.NoSuchMethodError: org.apache.spark.internal.Logging.$init$(Lorg/apache/spark/internal/Logging;)V

连接字符串是正确的,因为它也在写入 Azure 事件中心且有效的控制台应用程序中使用。

请有人指出我正确的方向。使用中的代码如下:

from pyspark.sql.functions import *
from pyspark.sql.types import *

# Event Hub Namespace Name
NAMESPACE_NAME = "*myEventHub*"
KEY_NAME = "*MyPolicyName*"
KEY_VALUE = "*MySharedAccessKey*"

# The connection string to your Event Hubs Namespace
connectionString = "Endpoint=sb://{0}.servicebus.windows.net/;SharedAccessKeyName={1};SharedAccessKey={2};EntityPath=ingestion".format(NAMESPACE_NAME, KEY_NAME, KEY_VALUE)

ehConf = {}
ehConf['eventhubs.connectionString'] = connectionString

# For 2.3.15 version and above, the configuration dictionary requires that connection string be encrypted.
# ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()
Run Code Online (Sandbox Code Playgroud)

Joo*_*oon 6

在运行 Spark 3.0 和 Scala 2.12 的 Spark 集群上安装版本号为 com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.* 的库时,我收到了同样的错误

对于通过 google 找到此内容的其他人 - 检查您是否拥有正确的 Scala 库版本。就我而言,我的集群是 Spark v3 和 Scala 2.12

将库版本中的“2.11”从我使用的教程更改为“2.12”,使其与我的集群运行时版本匹配,解决了问题。


KMM*_*KMM 5

为了解决该问题,我执行了以下操作:

  1. 卸载 azure 事件中心库版本
  2. 从 Maven Central 安装 com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.15 库版本
  3. 重启集群
  4. 通过重新运行问题中提供的代码进行验证