Hao*_*ang 5 apache-spark spark-structured-streaming
我有两个版本的 Spark 代码。第一个使用带有 Kafka 源的结构化流:
dfStream.printSchema()
//root
//|-- dt: string (nullable = true)
//|-- ip: string (nullable = true)
//|-- device: string (nullable = true)
val dfWindowed = dfStream
.groupBy($"ip")
.agg(concat_ws(",", collect_list($"device")).alias("devices"))
.writeStream
.outputMode("complete")
.format("memory")
.start()
Run Code Online (Sandbox Code Playgroud)
第二个从文件中读取。但是数据真的和上面一样:
logDF.printSchema()
//root
//|-- dt: string (nullable = true)
//|-- ip: string (nullable = true)
//|-- device: string (nullable = true)
logDF.repartition(32)
.groupBy("ip")
.agg(concat_ws(",", collect_list($"device")).alias("devices"))
Run Code Online (Sandbox Code Playgroud)
问题是,虽然第二个运行良好,但第一个一直给我以下错误:
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 1.0 failed 4 times, most recent failure: Lost task 9.3 in stage 1.0 (TID 28, c3-hadoop-prc-st3417.bj, executor 3): java.lang.RuntimeException: Collect cannot be used in partial aggregations.
Run Code Online (Sandbox Code Playgroud)
一个长句子..但错误似乎如下:
java.lang.RuntimeException: Collect cannot be used in partial aggregations.
Run Code Online (Sandbox Code Playgroud)
我发现了几个相关的 SO 问题,但到目前为止没有解决方案。非常感谢有关以下方面的任何建议:
我想你可以考虑将解决方法作为groupByKey -> reduceGroup链,例如:
case class Data(ip: Int, column1: String, column2: String)
import spark.implicits._
val path = "/tmp/spark-streaming/test-data"
Seq(
(1, "val1", "field1"),
(1, "val2", "field2"),
(1, "val3", "field3"),
(1, "val4", "field4"),
(2, "val1", "field1"),
(3, "val1", "field1"),
(4, "val1", "field1"),
(4, "val2", "field2")
).toDF("ip", "column1", "column2").write.mode("overwrite").parquet(path)
spark.read.parquet(path).printSchema()
spark.read.parquet(path).show(false)
spark.sql("SET spark.sql.streaming.schemaInference=true")
val stream = spark.readStream.parquet(path).as[Data]
val result =
stream
.groupByKey(_.ip)
.reduceGroups { (l, r) =>
l.copy(column1 = l.column1.concat(",").concat(r.column1), column2 = l.column2.concat(",").concat(r.column2))
}
.map(_._2)
result.printSchema()
result.writeStream
.option("checkpointLocation", "/tmp/spark-streaming-checkpoint-test")
.option("truncate", "false")
.format("console")
.outputMode("update")
.start()
.awaitTermination(300000)
Seq(
(1, "val5", "field5"),
(2, "val2", "field2"),
(3, "val2", "field2"),
(4, "val3", "field3")
).toDF("ip", "column1", "column2").write.mode("append").parquet(path)
Run Code Online (Sandbox Code Playgroud)
这将导致如下结果:
+---+-------------------+---------------------------+
|ip |column1 |column2 |
+---+-------------------+---------------------------+
|1 |val1,val2,val3,val4|field1,field2,field3,field4|
|3 |val1 |field1 |
|4 |val1,val2 |field1,field2 |
|2 |val1 |field1 |
+---+-------------------+---------------------------+
Run Code Online (Sandbox Code Playgroud)
注意:自 2.3.1 起,完整模式不支持聚合操作
希望能帮助到你!
| 归档时间: |
|
| 查看次数: |
1179 次 |
| 最近记录: |