Spark 结构化流:当前批次落后

Him*_*dav 5 java apache-spark spark-structured-streaming

这似乎是非常简单的实现,但看起来存在一些问题。

该作业从 kafka 主题读取偏移量(ui 事件数据),进行一些聚合并将其写入 Aerospike 数据库。

在高流量的情况下,我开始看到这个问题,作业运行良好,但没有插入新数据。查看日志我看到以下警告消息:

当前批次落后了。触发间隔为30000毫秒,但花费了43491毫秒

很少有作业恢复写入数据,但我可以看到计数很低,这表明存在一些数据丢失。

这是代码:

Dataset<Row> stream = sparkSession.readStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaBootstrapServersString)
          .option("subscribe", newTopic)
          .option("startingOffsets", "latest")
          .option("enable.auto.commit", false)
          .option("failOnDataLoss", false)
          .load();
StreamingQuery query = stream
        .writeStream()
        .option("startingOffsets", "earliest")
        .outputMode(OutputMode.Append())
        .foreach(sink)
        .trigger(Trigger.ProcessingTime(triggerInterval))
        .queryName(queryName)
        .start();
Run Code Online (Sandbox Code Playgroud)

Jun*_*Lim 2

您可能需要调整maxOffsetsPerTrigger每批的总输入记录。否则,应用程序的滞后可能会在批次中带来更多记录,从而减慢下一批的速度,进而在后续批次中带来更多滞后。

有关 Kafka 配置的更多详细信息,请参阅以下链接。

https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html