Jav*_*ier 3 apache-kafka mesos apache-spark spark-streaming
我有一个在 Mesos 上运行的 Spark 流作业。它的所有批次都花费完全相同的时间,并且这个时间比预期的要长得多。这些作业从 kafka 提取数据,处理数据并将其插入 cassandra,然后再次返回到 kafka 到不同的主题。
每个批次(如下)有 3 个作业,其中 2 个从 kafka 拉取、处理并插入到 cassandra 中,另一个从 kafka 拉取、处理并推回到 kafka 中。
我检查了 Spark UI 中的批次,发现它们都花费相同的时间(4 秒),但深入研究,它们实际上每个处理时间不到一秒,但它们都有相同时间的间隙(大约 4 秒)。添加更多的执行器或更多的处理能力看起来不会产生什么影响。
Details of batch: Processing time = 12s & total delay = 1.2 s
??
因此,我深入研究批次中的每个作业(即使它们进行不同的处理,它们都花费完全相同的时间 = 4 秒):
他们都花了 4 秒来运行他们的一个阶段(从 kafka 读取的阶段)。现在我深入研究其中之一的阶段(它们都非常相似):
为什么要等这个?整个事情实际上只需要0.5秒就可以运行,它只是在等待。是在等待卡夫卡吗?
有人经历过类似的事情吗?我可能编码错误或配置错误?
编辑:
这是触发此行为的最少代码。这让我觉得这一定是某种设置。
object Test {
def main(args: Array[String]) {
val sparkConf = new SparkConf(true)
val streamingContext = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "####,####,####",
"group.id" -> "test"
)
val stream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
streamingContext, kafkaParams, Set("test_topic")
)
stream.map(t => "LEN=" + t._2.length).print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
Run Code Online (Sandbox Code Playgroud)
即使所有执行器都在同一个节点(spark.executor.cores=2 spark.cores.max=2
),问题仍然存在,而且和之前一样正好是 4 秒:一个 mesos 执行器
即使主题没有消息(批次为 0 条记录),Spark Streaming 每批次也需要 4 秒。
我能够解决此问题的唯一方法是设置cores=1
,cores.max=1
以便它只创建一个要执行的任务。
该任务具有局部性NODE_LOCAL
。所以看起来当NODE_LOCAL
执行是瞬时的但是当Locality是时ANY
需要4秒才能连接到kafka。所有机器都位于同一个 10Gb 网络中。知道为什么会这样吗?
归档时间: |
|
查看次数: |
2696 次 |
最近记录: |