在 Spark Structured Streaming 中使用 collect_list 时出错

Hao*_*ang 5 apache-spark spark-structured-streaming

我有两个版本的 Spark 代码。第一个使用带有 Kafka 源的结​​构化流:

dfStream.printSchema()
//root
//|-- dt: string (nullable = true)
//|-- ip: string (nullable = true)
//|-- device: string (nullable = true)

val dfWindowed = dfStream                                                
    .groupBy($"ip")
    .agg(concat_ws(",", collect_list($"device")).alias("devices"))
    .writeStream                                             
    .outputMode("complete")                                              
    .format("memory")                                                        
    .start()           
Run Code Online (Sandbox Code Playgroud)

第二个从文件中读取。但是数据真的和上面一样:

logDF.printSchema() 
//root
//|-- dt: string (nullable = true)
//|-- ip: string (nullable = true)
//|-- device: string (nullable = true)

logDF.repartition(32)                                                                                                   
    .groupBy("ip")
    .agg(concat_ws(",", collect_list($"device")).alias("devices")) 
Run Code Online (Sandbox Code Playgroud)

问题是,虽然第二个运行良好,但第一个一直给我以下错误:

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:284)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 1.0 failed 4 times, most recent failure: Lost task 9.3 in stage 1.0 (TID 28, c3-hadoop-prc-st3417.bj, executor 3): java.lang.RuntimeException: Collect cannot be used in partial aggregations.
Run Code Online (Sandbox Code Playgroud)

一个长句子..但错误似乎如下:

java.lang.RuntimeException: Collect cannot be used in partial aggregations.
Run Code Online (Sandbox Code Playgroud)

我发现了几个相关的 SO 问题,但到目前为止没有解决方案。非常感谢有关以下方面的任何建议:

  1. “部分聚合”的含义以及静态(非流)数据集没有此类问题的原因,
  2. 一个解决方法...

Mik*_*kov 0

我想你可以考虑将解决方法作为groupByKey -> reduceGroup链,例如:

case class Data(ip: Int, column1: String, column2: String)

import spark.implicits._

val path = "/tmp/spark-streaming/test-data"

  Seq(
    (1, "val1", "field1"),
    (1, "val2", "field2"),
    (1, "val3", "field3"),
    (1, "val4", "field4"),
    (2, "val1", "field1"),
    (3, "val1", "field1"),
    (4, "val1", "field1"),
    (4, "val2", "field2")
  ).toDF("ip", "column1", "column2").write.mode("overwrite").parquet(path)

  spark.read.parquet(path).printSchema()

  spark.read.parquet(path).show(false)

  spark.sql("SET spark.sql.streaming.schemaInference=true")

  val stream = spark.readStream.parquet(path).as[Data]

  val result =
    stream
      .groupByKey(_.ip)
      .reduceGroups { (l, r) =>
        l.copy(column1 = l.column1.concat(",").concat(r.column1), column2 = l.column2.concat(",").concat(r.column2))
      }
      .map(_._2)



   result.printSchema()

   result.writeStream
    .option("checkpointLocation", "/tmp/spark-streaming-checkpoint-test")
    .option("truncate", "false")
    .format("console")
    .outputMode("update")
    .start()
    .awaitTermination(300000)

Seq(
    (1, "val5", "field5"),
    (2, "val2", "field2"),
    (3, "val2", "field2"),
    (4, "val3", "field3")
  ).toDF("ip", "column1", "column2").write.mode("append").parquet(path)
Run Code Online (Sandbox Code Playgroud)

这将导致如下结果:

+---+-------------------+---------------------------+                           
|ip |column1            |column2                    |
+---+-------------------+---------------------------+
|1  |val1,val2,val3,val4|field1,field2,field3,field4|
|3  |val1               |field1                     |
|4  |val1,val2          |field1,field2              |
|2  |val1               |field1                     |
+---+-------------------+---------------------------+
Run Code Online (Sandbox Code Playgroud)

注意:自 2.3.1 起,完整模式不支持聚合操作

希望能帮助到你!