找不到数据源:请按照《Structured Streaming + Kafka 集成指南》部署部分部署应用程序

Tla*_*-ES 2 apache-kafka apache-spark pyspark

您好,我正在尝试使用 pyspark + kafka 来执行此操作,我执行此命令以设置 kafka 集群

zookeeper-server-start.sh $KAFKA_HOME/../config/zookeeper.properties

kafka-server-start.sh $KAFKA_HOME/../config/*-0.properties & kafka-server-start.sh $KAFKA_HOME/../config/*-1.properties
Run Code Online (Sandbox Code Playgroud)
  • Spark版本是-spark-3.2.0-bin-hadoop2-7
  • 卡夫卡版本是 - kafka_2.13-3.0.0
  • pyspark版本是3.2.0

蟒蛇代码是:

spark_version = '3.2.0'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(spark_version)

spark = SparkSession \
    .builder \
    .appName("TP3") \
    .getOrCreate()

!spark-submit --class TP3 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 TweetCount.ipynb
Run Code Online (Sandbox Code Playgroud)

这将返回以下错误:

错误:无法加载类 TP3。

当我执行spark.readStream时

consumer = KafkaConsumer('topic')
df_kafka = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", 'localhost:9092') \
    .option("subscribe", 'topic') \
    .load()
Run Code Online (Sandbox Code Playgroud)

我得到了这个错误:

未能找到数据源:kafka。请按照《结构化流+Kafka集成指南》的部署部分部署应用程序。

如何执行 readstream 以便使用 pyspark 从 kafka 读取数据?

谢谢

Tla*_*-ES 9

最后,我在笔记本的开头使用以下代码解决了这个问题。

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'
Run Code Online (Sandbox Code Playgroud)