小编Pri*_*adi的帖子

如何在按键进行结构化流重新分区中保留每个键的事件顺序?

我想编写一个结构化的火花流 Kafka 消费者,它从一个分区的 Kafka 主题中读取数据,通过“键”将传入的数据重新分区到 3 个火花分区,同时保持每个键的消息排序,并将它们写入另一个具有 3 个分区的 Kafka 主题.

我使用Dataframe.repartition(3, $"key")了我相信使用 HashPartitioner 的地方。下面提供了代码。

当我使用固定批次间隔触发器类型执行查询时,我直观地验证了输出消息是否符合预期顺序。我的假设是无法保证结果分区的顺序。我希望在 Spark 代码存储库或文档中的代码指针方面收到一些肯定或否决我的假设。

我也尝试使用Dataframe.sortWithinPartitions,但是在没有聚合的流数据帧上似乎不支持。

我尝试过的一种选择是将 Dataframe 转换为 RDD 并repartitionAndSortWithinPartitions根据给定的分区器应用对 RDD进行重新分区,并在每个结果分区中按键对记录进行排序。但是,我无法在 query.writestream 操作中使用此 RDD 将结果写入输出 Kafka 主题。

  1. 是否有数据帧重新分区 API 可以帮助对流上下文中重新分区的数据进行排序?
  2. 还有其他选择吗?
  3. 微批处理执行的默认触发类型或固定间隔触发类型是否提供任何类型的消息排序保证?

传入数据:

case class KVOutput(key: String, ts: Long, value: String, spark_partition: Int)

val df = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers.get)
  .option("subscribe", Array(kafkaInputTopic.get).mkString(","))
  .option("maxOffsetsPerTrigger",30)
  .load()

val inputDf = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
val resDf = inputDf.repartition(3, $"key")
  .select(from_json($"value", schema).as("kv"))
  .selectExpr("kv.key", "kv.ts", …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-structured-streaming

6
推荐指数
0
解决办法
645
查看次数