Har*_*vey 6 apache-kafka apache-spark pyspark spark-structured-streaming spark-kafka-integration
我在理解如何连接 Kafka 和 PySpark 时遇到问题。
我在 Windows 10 上安装了 kafka,主题很好地流式传输数据。我已经安装了可以正常运行的 pyspark - 我能够毫无问题地创建测试 DataFrame。
但是当我尝试连接到 Kafka 流时,它给了我错误:
AnalysisException:找不到数据源:kafka。请按照“结构化流-Kafka集成指南”的部署部分部署应用程序。
Spark 文档并没有真正的帮助 - 它说: ... groupId = org.apache.spark artifactId = Spark-sql-kafka-0-10_2.12 version = 3.2.0 ...
对于 Python 应用程序,您需要在部署应用程序时添加上述库及其依赖项。请参阅下面的部署小节。
然后当您转到“部署”部分时,它会显示:
与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。可以使用--packages将spark-sql-kafka-0-10_2.12及其依赖直接添加到spark-submit中,例如./bin/spark-submit --packages org.apache.spark:spark-sql-卡夫卡-0-10_2.12:3.2.0 ...
我正在开发应用程序,我不想部署它。如果我正在开发 pyspark 应用程序,在哪里以及如何添加这些依赖项?
尝试了几个教程最终变得更加混乱。
看到答案这么说
“您需要将 kafka-clients JAR 添加到您的 --packages 中”。如此回答
更多的步骤可能会有用,因为对于新手来说这还不清楚。
版本:
所有环境变量和路径均已正确设置。
编辑
我已经加载:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1'
Run Code Online (Sandbox Code Playgroud)
按照建议但仍然出现相同的错误。我已经三次检查了 kafka、scala 和 Spark 版本,并尝试了各种组合,但没有成功,我仍然遇到相同的错误:
AnalysisException:找不到数据源:kafka。请按照《Structured Streaming-Kafka集成指南》的部署部分部署应用程序。
编辑2
我安装了最新的Spark 3.2.0和Hadoop 3.3.1以及kafka版本kafka_2.12-2.8.1。更改了所有环境变量,测试了 Spark 和 Kafka - 工作正常。
我的环境变量现在看起来像这样:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.apache.kafka:kafka-clients:2.8.1'
Run Code Online (Sandbox Code Playgroud)
仍然没有运气,我得到同样的错误:(
Spark文档并没有真正的帮助 - 它说...artifactId = Spark-sql-kafka-0-10_2.12 version = 3.2.0 ...
是的,这是正确的......但您正在查看最新版本的 Spark文档
相反,你提到过
版本:
- 火花3.1.2
您是否尝试过查看特定于版本的文档?
换句话说,您需要spark-sql-kafka3.1.2 的匹配版本。
bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2
或者在Python中,
scala_version = '2.12'
spark_version = '3.1.2'
# TODO: Ensure match above values match the correct versions
packages = [
f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
'org.apache.kafka:kafka-clients:3.2.1'
]
spark = SparkSession.builder\
.master("local")\
.appName("kafka-example")\
.config("spark.jars.packages", ",".join(packages))\
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
或者使用 env-var
import os
spark_version = '3.1.2'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(spark_version)
# init spark here
Run Code Online (Sandbox Code Playgroud)
需要添加上面的库及其依赖项
正如您在我之前的回答中发现的那样,还kafka-clients可以使用逗号分隔的列表附加包。
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1
我正在开发应用程序,我不想部署它。
“部署”是 Spark 术语。本地运行仍然是“部署”
| 归档时间: |
|
| 查看次数: |
13958 次 |
| 最近记录: |