小编Raa*_*kar的帖子

来自 Kafka 的 pySpark Structured Streaming 不会输出到控制台进行调试

下面是我的代码。我尝试了许多不同的选择变体,但应用程序可以运行,但没有显示每秒写入的消息。我有一个 Spark Streaming 示例,它使用 pprint() 确认 kafka 实际上每秒都在获取消息。Kafka 中的消息是 JSON 格式的,请参阅字段/列标签的架构:

from pyspark.sql.functions import *
from pyspark.sql.types import *
import statistics


KAFKA_TOPIC = "vehicle_events_fast_testdata"
KAFKA_SERVER = "10.2.0.6:2181"

if __name__ == "__main__":
    print("NXB PySpark Structured Streaming with Kafka Demo Started")

    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka Demo") \
        .master("local[*]") \
        .config("spark.jars", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar,/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.executor.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.executor.extraLibrary", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.driver.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    schema = StructType() \
        .add("WheelAngle", IntegerType()) \
        .add("acceleration", IntegerType()) \
        .add("heading", IntegerType()) …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka apache-spark pyspark spark-structured-streaming

4
推荐指数
1
解决办法
997
查看次数