Spark 流作业中的任务之间长时间且一致的等待

Jav*_*ier 3 apache-kafka mesos apache-spark spark-streaming

我有一个在 Mesos 上运行的 Spark 流作业。它的所有批次都花费完全相同的时间,并且这个时间比预期的要长得多。这些作业从 kafka 提取数据,处理数据并将其插入 cassandra,然后再次返回到 kafka 到不同的主题。

每个批次(如下)有 3 个作业,其中 2 个从 kafka 拉取、处理并插入到 cassandra 中,另一个从 kafka 拉取、处理并推回到 kafka 中。

我检查了 Spark UI 中的批次,发现它们都花费相同的时间(4 秒),但深入研究,它们实际上每个处理时间不到一秒,但它们都有相同时间的间隙(大约 4 秒)。添加更多的执行器或更多的处理能力看起来不会产生什么影响。

Details of batch: Processing time = 12s & total delay = 1.2 s??

因此,我深入研究批次中的每个作业(即使它们进行不同的处理,它们都花费完全相同的时间 = 4 秒):

工作 175 秒

工作 1753

工作 1754

他们都花了 4 秒来运行他们的一个阶段(从 kafka 读取的阶段)。现在我深入研究其中之一的阶段(它们都非常相似):

阶段 2336 的详细信息

为什么要等这个?整个事情实际上只需要0.5秒就可以运行,它只是在等待。是在等待卡夫卡吗?

有人经历过类似的事情吗?我可能编码错误或配置错误?

编辑:

这是触发此行为的最少代码。这让我觉得这一定是某种设置。

object Test {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf(true)
    val streamingContext = new StreamingContext(sparkConf, Seconds(5))

    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> "####,####,####",
      "group.id" -> "test"
    )

    val stream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
      streamingContext, kafkaParams, Set("test_topic")
    )

    stream.map(t => "LEN=" + t._2.length).print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}
Run Code Online (Sandbox Code Playgroud)

即使所有执行器都在同一个节点(spark.executor.cores=2 spark.cores.max=2),问题仍然存在,而且和之前一样正好是 4 秒:一个 mesos 执行器

即使主题没有消息(批次为 0 条记录),Spark Streaming 每批次也需要 4 秒。

我能够解决此问题的唯一方法是设置cores=1cores.max=1以便它只创建一个要执行的任务。

该任务具有局部性NODE_LOCAL。所以看起来当NODE_LOCAL执行是瞬时的但是当Locality是时ANY需要4秒才能连接到kafka。所有机器都位于同一个 10Gb 网络中。知道为什么会这样吗?

Jav*_*ier 6

问题出在spark.locality.wait,这个链接给了我这个想法

它的默认值是 3 秒,在 Spark Streaming 中处理的每个批次都花费了整个时间。

当使用 Mesos ( ) 提交作业时,我将其设置为 0 秒--conf spark.locality.wait=0,现在一切都按预期运行。