Pri*_*adi 6 apache-spark spark-structured-streaming
我想编写一个结构化的火花流 Kafka 消费者,它从一个分区的 Kafka 主题中读取数据,通过“键”将传入的数据重新分区到 3 个火花分区,同时保持每个键的消息排序,并将它们写入另一个具有 3 个分区的 Kafka 主题.
我使用Dataframe.repartition(3, $"key")了我相信使用 HashPartitioner 的地方。下面提供了代码。
当我使用固定批次间隔触发器类型执行查询时,我直观地验证了输出消息是否符合预期顺序。我的假设是无法保证结果分区的顺序。我希望在 Spark 代码存储库或文档中的代码指针方面收到一些肯定或否决我的假设。
我也尝试使用Dataframe.sortWithinPartitions,但是在没有聚合的流数据帧上似乎不支持。
我尝试过的一种选择是将 Dataframe 转换为 RDD 并repartitionAndSortWithinPartitions根据给定的分区器应用对 RDD进行重新分区,并在每个结果分区中按键对记录进行排序。但是,我无法在 query.writestream 操作中使用此 RDD 将结果写入输出 Kafka 主题。
传入数据:

case class KVOutput(key: String, ts: Long, value: String, spark_partition: Int)
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers.get)
.option("subscribe", Array(kafkaInputTopic.get).mkString(","))
.option("maxOffsetsPerTrigger",30)
.load()
val inputDf = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
val resDf = inputDf.repartition(3, $"key")
.select(from_json($"value", schema).as("kv"))
.selectExpr("kv.key", "kv.ts", "kv.value")
.withColumn("spark_partition", spark_partition_id())
.select($"key", $"ts", $"value", $"spark_partition").as[KVOutput]
.sortWithinPartitions($"ts", $"value")
.select($"key".cast(StringType).as("key"), to_json(struct($"*")).cast(StringType).as("value"))
val query = resDf.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers.get)
.option("topic", kafkaOutputTopic.get)
.option("checkpointLocation", checkpointLocation.get)
.start()
Run Code Online (Sandbox Code Playgroud)
当我提交这个申请时,它失败了
8/11/08 22:13:20 ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;
Run Code Online (Sandbox Code Playgroud)

| 归档时间: |
|
| 查看次数: |
645 次 |
| 最近记录: |