从非空 Kafka 主题(从最早开始)读取的 Spark 结构化流程序在本地触发批处理,但不在 EMR 集群上触发批处理

Chr*_*ord 5 amazon-emr apache-kafka apache-spark spark-structured-streaming

我们遇到了一个问题,对于我们的一个应用程序,我们在 Spark UI 的“结构化流”选项卡中看不到任何正在处理批次的证据。

我编写了一个小程序(如下)来重现该问题。一个独立的项目,允许您构建应用程序,以及便于上传到 AWS 的脚本,以及如何运行和重现问题的详细信息可以在此处找到: https: //github.com/buildlackey/spark-struct -streaming-metrics-missing-on-aws(该应用程序的 github 版本是下面介绍的稍微改进的版本,但它说明了 Spark 流指标未显示的问题。)

该程序可以“本地”运行——在某人的笔记本电脑上以本地[*]模式运行(例如使用 Docker 化的 Kafka 实例),或者在 EMR 集群上运行。对于本地模式操作,您可以使用“localTest”作为第一个参数来调用 main 方法。

在我们的例子中,当我们在 EMR 集群上运行时,指向一个我们知道有许多数据记录的主题(我们从“最早的”读取),我们发现由于某种原因集群上确实没有处理任何批次...

在本地[*] 情况下,我们可以看到已处理的批次。为了捕获这方面的证据,我编写了一个 forEachBatch 处理程序,它只需对每个批次的数据集执行 toLocalIterator.asScala.toList.mkString("\n") 操作,然后将结果字符串转储到文件中。在本地运行..我在临时文件中看到了捕获记录的证据。然而,当我在集群上运行并且通过 ssh 进入其中一个执行程序时,我没有看到此类文件。我还检查了主节点...没有与模式“缺失”匹配的文件,因此...批次不会在集群上触发。我们的 kakfa 拥有大量数据,当在集群上运行时,日志显示我们正在以不断增加的偏移量搅动消息:

21/12/16 05:15:21 DEBUG KafkaDataConsumer: Get spark-kafka-source-blah topic.foo.event-18 nextOffset 4596542913 requested 4596542913
21/12/16 05:15:21 DEBUG KafkaDataConsumer: Get spark-kafka-source-blah topic.foo.event-18 nextOffset 4596542914 requested 4596542914
Run Code Online (Sandbox Code Playgroud)

注意获取我们正在使用的日志:

yarn yarn logs --applicationId <appId>
Run Code Online (Sandbox Code Playgroud)

它应该获取整个运行的驱动程序和执行程序日志(当应用程序终止时)

现在,在本地[*] 情况下,我们可以看到已处理的批次。证据是我们在 tmp 文件夹中看到一个名称与模式“ Missing ”匹配的文件。

我在下面包含了我的简单演示程序。如果您能发现问题并向我们提供线索,我将不胜感激!

// Please forgive the busy code.. i stripped this down from a much larger system....
import com.typesafe.scalalogging.StrictLogging
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.{Dataset, SparkSession}
import java.io.File
import java.util
import scala.collection.JavaConverters.asScalaIteratorConverter
import scala.concurrent.duration.Duration

object AwsSupportCaseFailsToYieldLogs extends StrictLogging {
  case class KafkaEvent(fooMsgKey: Array[Byte],
                        fooMsg: Array[Byte],
                        topic: String,
                        partition: String,
                        offset: String) extends Serializable

  case class SparkSessionConfig(appName: String, master: String) {
    def sessionBuilder(): SparkSession.Builder = {
      val builder = SparkSession.builder
      builder.master(master)
      builder
    }
  }

  case class KafkaConfig(kafkaBootstrapServers: String, kafkaTopic: String, kafkaStartingOffsets: String)
    def sessionFactory: (SparkSessionConfig) => SparkSession = {
      (sparkSessionConfig) => {
        sparkSessionConfig.sessionBuilder().getOrCreate()
      }
    }

    def main(args: Array[String]): Unit = {
      val (sparkSessionConfig, kafkaConfig) =
        if (args.length >= 1 && args(0) == "localTest") {
          getLocalTestConfiguration
        } else {
          getRunOnClusterConfiguration
        }

      val spark: SparkSession = sessionFactory(sparkSessionConfig)

      spark.sparkContext.setLogLevel("ERROR")

      import spark.implicits._

      val dataSetOfKafkaEvent: Dataset[KafkaEvent] = spark.readStream.
        format("kafka").
        option("subscribe", kafkaConfig.kafkaTopic).
        option("kafka.bootstrap.servers", kafkaConfig.kafkaBootstrapServers).
        option("startingOffsets", kafkaConfig.kafkaStartingOffsets).
        load.
        select(
          $"key" cast "binary",
          $"value" cast "binary",
          $"topic",
          $"partition" cast "string",
          $"offset" cast "string").map { row =>

        KafkaEvent(
          row.getAs[Array[Byte]](0),
          row.getAs[Array[Byte]](1),
          row.getAs[String](2),
          row.getAs[String](3),
          row.getAs[String](4))
      }

      val initDF = dataSetOfKafkaEvent.map { item: KafkaEvent => item.toString }
      val function: (Dataset[String], Long) => Unit =
        (dataSetOfString, batchId) => {
          val iter: util.Iterator[String] = dataSetOfString.toLocalIterator()

          val lines  = iter.asScala.toList.mkString("\n")
          val outfile = writeStringToTmpFile(lines)
          println(s"writing to file: ${outfile.getAbsolutePath}")
          logger.error(s"writing to file: ${outfile.getAbsolutePath} /  $lines")
        }
      val trigger = Trigger.ProcessingTime(Duration("1 second"))

      initDF.writeStream
        .foreachBatch(function)
        .trigger(trigger)
        .outputMode("append")
        .start
        .awaitTermination()
    }

    private def getLocalTestConfiguration: (SparkSessionConfig, KafkaConfig) = {
      val sparkSessionConfig: SparkSessionConfig =
        SparkSessionConfig(master = "local[*]", appName = "dummy2")
      val kafkaConfig: KafkaConfig =
        KafkaConfig(
          kafkaBootstrapServers = "localhost:9092",
          kafkaTopic = "test-topic",
          kafkaStartingOffsets = "earliest")
      (sparkSessionConfig, kafkaConfig)
    }

    private def getRunOnClusterConfiguration = {
      val sparkSessionConfig: SparkSessionConfig = SparkSessionConfig(master = "yarn", appName = "AwsSupportCase")
      val kafkaConfig: KafkaConfig =
        KafkaConfig(
          kafkaBootstrapServers= "kafka.foo.bar.broker:9092",         //  TODO - change this for kafka on your EMR cluster.
          kafkaTopic= "mongo.bongo.event",                            //  TODO - change this for kafka on your EMR cluster.
          kafkaStartingOffsets = "earliest")
      (sparkSessionConfig, kafkaConfig)
    }

  def writeStringFile(string: String, file: File): File = {
    java.nio.file.Files.write(java.nio.file.Paths.get(file.getAbsolutePath), string.getBytes).toFile
  }

  def writeStringToTmpFile(string: String, deleteOnExit: Boolean = false): File = {
    val file: File = File.createTempFile("streamingConsoleMissing", "sad")
    if (deleteOnExit) {
      file.delete()
    }
    writeStringFile(string, file)
  }
}
Run Code Online (Sandbox Code Playgroud)

Chr*_*ord 0

这是一个临时答案。我们的一位团队成员提出了一个看起来很有可能的理论。如下:批次正在处理(我在 github 上链接的程序版本更好地证明了这一点),但我们认为,由于我们集群上的主题中有如此多的备份,因此处理(从最早的) )第一批需要很长时间,因此当查看集群时,我们看到处理了零批......即使显然有工作正在完成。解决方案可能是使用 maxOffsetsPerTrigger 来控制传入流量(当从最早开始并使用具有大量数据的主题时)。我们正在努力确认这一点。