使用 Python 中的 Spark Structured Sreaming 从 Kafka 读取数据并打印到控制台

M_G*_*_Gh 4 apache-kafka apache-spark apache-spark-sql pyspark spark-structured-streaming

我在 Ubuntu 20.04 中有kafka_2.13-2.7.0 。我运行 kafka 服务器和 Zookeeper,然后创建一个主题并通过 发送其中的文本文件nc -lk 9999。该主题充满了数据。另外,我的系统上有spark-3.0.1-bin-hadoop2.7。事实上,我想使用 kafka 主题作为 Spark Structured Streaming with python 的来源。我的代码是这样的:

spark = SparkSession \
    .builder \
    .appName("APP") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sparktest") \
    .option("startingOffsets", "earliest") \
    .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()
Run Code Online (Sandbox Code Playgroud)

我使用以下命令通过Spark-Submit运行上述代码:

./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 /home/spark/PycharmProjects/testSparkStream/KafkaToSpark.py 
Run Code Online (Sandbox Code Playgroud)

代码运行没有任何异常,我收到 Spark 站点中的输出:

   root
    |-- key: binary (nullable = true)
    |-- value: binary (nullable = true)
    |-- topic: string (nullable = true)
    |-- partition: integer (nullable = true)
    |-- offset: long (nullable = true)
    |-- timestamp: timestamp (nullable = true)
    |-- timestampType: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我的问题是kafka主题充满了数据;但是在输出中运行代码后没有任何数据。请您指导我这里出了什么问题吗?

mik*_*ike 5

原样的代码不会打印出任何数据,而只会为您提供一次架构。

您可以按照《Structured Streaming 通用指南》《Structured Streaming + Kafka 集成指南》中的说明查看如何将数据打印到控制台。请记住,在 Spark 中读取数据是一种惰性操作,如果没有操作(通常是writeStream操作),则不会完成任何操作。

如果您补充如下代码,您应该会看到所选数据(键和值)打印到控制台:

spark = SparkSession \
          .builder \
          .appName("APP") \
          .getOrCreate()

df = spark\
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option("subscribe", "sparktest") \
      .option("startingOffsets", "earliest") \
      .load()
      

query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .start()

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)