Raa*_*kar 4 python apache-kafka apache-spark pyspark spark-structured-streaming
下面是我的代码。我尝试了许多不同的选择变体,但应用程序可以运行,但没有显示每秒写入的消息。我有一个 Spark Streaming 示例,它使用 pprint() 确认 kafka 实际上每秒都在获取消息。Kafka 中的消息是 JSON 格式的,请参阅字段/列标签的架构:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import statistics
KAFKA_TOPIC = "vehicle_events_fast_testdata"
KAFKA_SERVER = "10.2.0.6:2181"
if __name__ == "__main__":
    print("NXB PySpark Structured Streaming with Kafka Demo Started")
    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka Demo") \
        .master("local[*]") \
        .config("spark.jars", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar,/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.executor.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.executor.extraLibrary", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.driver.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    schema = StructType() \
        .add("WheelAngle", IntegerType()) \
        .add("acceleration", IntegerType()) \
        .add("heading", IntegerType()) \
        .add("reading_time", IntegerType()) \
        .add("tractionForce", IntegerType()) \
        .add("vel_latitudinal", IntegerType()) \
        .add("vel_longitudinal", IntegerType()) \
        .add("velocity", IntegerType()) \
        .add("x_pos", IntegerType()) \
        .add("y_pos", IntegerType()) \
        .add("yawrate", IntegerType())
 # Construct a streaming DataFrame that reads from testtopic
    trans_det_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_SERVER) \
        .option("subscribe", KAFKA_TOPIC) \
        .option("startingOffsets", "latest") \
        .load() \
        .selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")
#(from_json(col("value").cast("string"),schema))
    #Q1 =  trans_det_df.select(from_json(col("value"), schema).alias("parsed_value"), "timestamp")
    #Q2 =  trans_det_d.select("parsed_value*", "timestamp")
    query = trans_det_df.writeStream \
            .format("console") \
            .option("truncate","false") \
            .start() \
            .awaitTermination()
kafka.bootstrap.servers是Kafka 代理地址(默认端口 9092),而不是 Zookeeper(端口 2181)
另请注意,您的起始偏移量是最新的,因此您必须在启动流应用程序后生成数据。
如果要查看现有主题数据,请使用最早的偏移量。
| 归档时间: | 
 | 
| 查看次数: | 997 次 | 
| 最近记录: |