Spark Structured Streaming DataFrame 上的排序操作

nil*_*212 5 scala apache-spark spark-structured-streaming

我正在尝试对 Spark 结构化流数据帧进行非常简单的排序操作,但最终出现“线程“主”org.apache.spark.sql.AnalysisException 中的异常:流数据帧/数据集不支持排序,除非它在聚合数据帧/上”完整输出模式下的数据集”,但有以下例外。你能帮我解决这个问题吗?

代码

   val df: DataFrame = spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", kafkaBrokerList)
        .option("kafka.security.protocol", security)
        .option("startingOffsets", "latest")
        .option("subscribe", srcTopic)
        .option("group.id", groupID)
        .option("failOnDataLoss", false)        
        .load

      val uDF = df
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .as[(String, String)]
        .select($"value")
        .select(from_json($"value", uSchema).as("events"))
        .select($"events.*")

     val uDF2 = uDF
        .select($"COL1", $"COL2", $"COL3", $"COL4", $"COL5", $"COL6", $"COL7", $"COL8")
        .sort($"COL5",$"COL3",$"COL8")


    val kDF = uDF2
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("kafka.security.protocol", "PLAINTEXT")
        .option("topic", "r_topic")
        .option("checkpointLocation", "/tmp/kafka-sink-checkpoint")
        .start()


    kDF.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

例外:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;
Run Code Online (Sandbox Code Playgroud)

数据:

想要按“COL5”、“COL3”、“COL8”对 DF 进行排序

+------------+--------------------------------------+-------------+-----+-----------+-------------+----------+----------+
|COL1        |COL2                                  |COL3         |COL4 |COL5       |COl6         |COL7      |COl8      |
+------------+--------------------------------------+-------------+-----+-----------+-------------+----------+----------+
|RunKafkaTest|DUMMY VALUE                           |1528326884394|52.0 |Analog     |0            |1528326880|67        |
|RunKafkaTest|DUMMY VALUE                           |1528326884388|53.0 |Analog     |0            |1528326880|68        |
|RunKafkaTest|DUMMY VALUE                           |1528326886400|54.0 |Analog     |0            |1528326880|69        |
|RunKafkaTest|DUMMY VALUE                           |1528326887412|55.0 |Analog     |0            |1528326880|70        |
|RunKafkaTest|DUMMY VALUE                           |1528326887406|56.0 |Analog     |0            |1528326880|71        |
|RunKafkaTest|DUMMY VALUE                           |1528326889418|57.0 |Analog     |0            |1528326880|72        |
|RunKafkaTest|DUMMY VALUE                           |1528326890423|58.0 |Analog     |0            |1528326880|73        |
|RunKafkaTest|DUMMY VALUE                           |1528326891429|59.0 |Analog     |0            |1528326880|74        |
|RunKafkaTest|DUMMY VALUE                           |1528326892435|1.0  |Analog     |0            |1528326880|76        |
|RunKafkaTest|DUMMY VALUE                           |1528326893449|2.0  |Analog     |0            |1528326880|77        |
|RunKafkaTest|DUMMY VALUE                           |1528326894447|3.0  |Analog     |0            |1528326880|78        |
|RunKafkaTest|DUMMY VALUE                           |1528326895459|4.0  |Analog     |0            |1528326880|79        |
|RunKafkaTest|DUMMY VALUE                           |1528326896458|5.0  |Analog     |0            |1528326880|80        |
|RunKafkaTest|DUMMY VALUE                           |1528326897464|6.0  |Analog     |0            |1528326880|81        |
|RunKafkaTest|DUMMY VALUE                           |1528326898370|7.0  |Analog     |0            |1528326880|82        |
|RunKafkaTest|DUMMY VALUE                           |1528326899476|8.0  |Analog     |0            |1528326880|83        |
|RunKafkaTest|DUMMY VALUE                           |1528326900482|9.0  |Analog     |0            |1528326880|84        |
|RunKafkaTest|DUMMY VALUE                           |1528326901488|10.0 |Analog     |0            |1528326880|85        |
|RunKafkaTest|DUMMY VALUE                           |1528326902493|11.0 |Analog     |0            |1528326880|86        |
+------------+--------------------------------------+-------------+-----+-----------+-------------+----------+----------+
Run Code Online (Sandbox Code Playgroud)

Jun*_*Lim 1

您可能需要重新考虑流中排序的输出。在真正的流式传输中,您永远不会获得输出,因为理论上您不太可能遇到流中的最后一个事件。虽然 Spark 实际上执行微批处理,但它试图保持与真实流式传输相似的语义。您最终可能会重新定义问题,并利用有状态操作,例如窗口或 flatMapGroupsWithState。您也许还可以手动分割范围并运行批处理。