在spark版本2.2.0中使用python(pyspark)从mqtt获取数据流

awi*_*sha 5 python mqtt spark-streaming pyspark

我在spark的POM.xml中添加了依赖项,在以下链接中给出:

http://bahir.apache.org/docs/spark/current/spark-sql-streaming-mqtt/

再次使用 Maven 构建 Spark。但我们可以看到,它仅显示 Java 和 Scala 支持从 mqtt 获取数据。

我想从python 中的 mqtt 获取流数据。 在早期版本中,我们有一个 pyspark.streaming.mqtt 来实现相同的目的。Spark 2.2.0 pyspark中与此类似的是什么?我正在使用 mosquitto 作为 mqtt 代理。

hi-*_*zir 2

对于 PySpark,您可以使用结构化流绑定(您必须包含 Bahir jar):

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()  # type: SparkSession
(spark
    .readStream
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .load("tcp://{}".format(broker_uri)))
Run Code Online (Sandbox Code Playgroud)