Tla*_*-ES 2 apache-kafka apache-spark pyspark
您好,我正在尝试使用 pyspark + kafka 来执行此操作,我执行此命令以设置 kafka 集群
zookeeper-server-start.sh $KAFKA_HOME/../config/zookeeper.properties
kafka-server-start.sh $KAFKA_HOME/../config/*-0.properties & kafka-server-start.sh $KAFKA_HOME/../config/*-1.properties
Run Code Online (Sandbox Code Playgroud)
蟒蛇代码是:
spark_version = '3.2.0'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(spark_version)
spark = SparkSession \
.builder \
.appName("TP3") \
.getOrCreate()
!spark-submit --class TP3 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 TweetCount.ipynb
Run Code Online (Sandbox Code Playgroud)
这将返回以下错误:
错误:无法加载类 TP3。
当我执行spark.readStream时
consumer = KafkaConsumer('topic')
df_kafka = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", 'localhost:9092') \
.option("subscribe", 'topic') \
.load()
Run Code Online (Sandbox Code Playgroud)
我得到了这个错误:
未能找到数据源:kafka。请按照《结构化流+Kafka集成指南》的部署部分部署应用程序。
如何执行 readstream 以便使用 pyspark 从 kafka 读取数据?
谢谢
最后,我在笔记本的开头使用以下代码解决了这个问题。
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7093 次 |
| 最近记录: |