火花流吞吐量监测

key*_*int 6 performance monitoring amazon-cloudwatch apache-spark spark-streaming

有没有办法监视Spark群集的输入和输出吞吐量,以确保群集不会被传入数据溢出和溢出?

就我而言,我在AWS EC2上设置了Spark集群,因此我正在考虑使用AWS CloudWatch来监控集群中每个节点的NetworkInNetworkOut.

但我的想法似乎并不准确,网络不仅仅意味着Spark的传入数据,也许还会计算其他一些数据.

是否有专门监控Spark群集流数据状态的工具或方法?或者我已经错过了Spark中的内置工具?


更新:Spark 1.4发布,端口4040的监控通过图形显示得到显着增强

maa*_*asg 12

Spark有一个可配置的度量子系统.默认情况下,它会发布已注册指标的JSON版本<driver>:<port>/metrics/json.可以配置其他指标同步,如ganglia,csv文件或JMX.

您需要一些外部监控系统,定期收集指标,帮助您理解它.(我们使用Ganglia,但还有其他开源和商业选项)

Spark Streaming发布了几个可用于监控作业性能的指标.要计算吞吐量,您可以合并:

(lastReceivedBatch_processingEndTime-lastReceivedBatch_processingStartTime)/lastReceivedBatch_records

对于支持的所有指标,请查看StreamingSource

示例:使用Spark 1.3.1启动本地REPL并在执行简单的流应用程序之后:

import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(10))
val queue = scala.collection.mutable.Queue(1,2,3,45,6,6,7,18,9,10,11)
val q = queue.map(elem => sc.parallelize(Seq(elem)))
val dstream = ssc.queueStream(q)
dstream.print
ssc.start
Run Code Online (Sandbox Code Playgroud)

一个人可以获得localhost:4040/metrics/json并返回:

{
version: "3.0.0",
gauges: {
local-1430558777965.<driver>.BlockManager.disk.diskSpaceUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.maxMem_MB: {
value: 2120
},
local-1430558777965.<driver>.BlockManager.memory.memUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.remainingMem_MB: {
value: 2120
},
local-1430558777965.<driver>.DAGScheduler.job.activeJobs: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.job.allJobs: {
value: 6
},
local-1430558777965.<driver>.DAGScheduler.stage.failedStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.runningStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.waitingStages: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_schedulingDelay: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_totalDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_records: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.receivers: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.retainedCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.runningBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalProcessedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalReceivedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.unprocessedBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.waitingBatches: {
value: 0
}
},
counters: { },
histograms: { },
meters: { },
timers: { }
}
Run Code Online (Sandbox Code Playgroud)