Spark 3.x 与 Python 中的 Kafka 集成

Tel*_*uar 2 apache-kafka apache-spark pyspark spark-structured-streaming spark-kafka-integration

带有 spark-streaming 的 Kafka 抛出一个错误:

from pyspark.streaming.kafka import KafkaUtils ImportError: No module named kafka
Run Code Online (Sandbox Code Playgroud)

我已经设置了一个 kafka 代理和一个工作的 Spark 环境,一个主人和一个工人。

import os

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2.7'
import findspark
findspark.init('/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7')
import pyspark
import sys
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils



if __name__=="__main__":
    sc = SparkContext(appName="SparkStreamAISfromKAFKA")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc,1)
    kvs = KafkaUtils.createStream(ssc,"my-kafka-broker","raw-event-streaming-consumer",{'enriched_ais_messages':1})
    lines = kvs.map(lambda x: x[1])
    lines.count().map(lambda x: 'Messages AIS: %s' % x).pprint()
    ssc.start()
    ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

我假设错误是缺少与 kafka ans 相关的特定版本。有人能帮忙吗?

火花版本:版本 3.0.0-preview2

我执行:

/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 --jars spark-streaming-kafka-0-10_2.11 spark_streamer.py spark://mysparkip:7077
Run Code Online (Sandbox Code Playgroud)

mik*_*ike 5

根据Spark Streaming + Kafka 集成指南

“从 Spark 2.3.0 开始,Kafka 0.8 支持已被弃用。”

此外,下面的屏幕截图显示 Kafka 0.10(及更高版本)不支持 Python。

在此处输入图片说明

在您的情况下,您必须使用 Spark 2.4 才能运行您的代码。

PySpark 支持结构化流

如果您打算使用最新版本的 Spark(例如 3.x),并且仍想在 Python 中将 Spark 与 Kafka 集成,您可以使用结构化流。您将在Structured Streaming + Kafka 集成指南(Kafka broker 版本 0.10.0 或更高版本)中找到有关如何使用 Python API 的详细说明:

从 Kafka 读取数据

# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Run Code Online (Sandbox Code Playgroud)

向Kafka写入数据

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()
Run Code Online (Sandbox Code Playgroud)