sbr*_*nes 7 scala akka-stream akka-monitoring kamon
我一直在尝试为Akka流设置一些仪器.有了它的工作,但是,即使我将所有流量命名为流的一部分,我仍然会在度量标准中得到这样的名称:flow-0-0-unknown-operation
我正在尝试做的一个简单示例:
val myflow = Flow[String].named("myflow").map(println)
Source.via(myflow).to(Sink.ignore).run()
Run Code Online (Sandbox Code Playgroud)
我基本上希望看到为"myflow"创建的Actor的度量标准,并使用正确的名称.
这甚至可能吗?我错过了什么吗?
我在我的项目中遇到了这个挑战,我通过使用 Kamon + Prometheus 解决了这个问题。然而,我必须创建一个 Akka Stream Flow
,我可以设置它的名称metricName
并使用 从中导出指标值val kamonThroughputGauge: Metric.Gauge
。
class MonitorProcessingTimerFlow[T](interval: FiniteDuration)(metricName: String = "monitorFlow") extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("MonitorProcessingTimerFlow.in")
val out = Outlet[T]("MonitorProcessingTimerFlow.out")
Kamon.init()
val kamonThroughputGauge: Metric.Gauge = Kamon.gauge("akka-stream-throughput-monitor")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
// mutable state
var open = false
var count = 0
var start = System.nanoTime
setHandler(in, new InHandler {
override def onPush(): Unit = {
try {
push(out, grab(in))
count += 1
if (!open) {
open = true
scheduleOnce(None, interval)
}
} catch {
case e: Throwable => failStage(e)
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
override protected def onTimer(timerKey: Any): Unit = {
open = false
val duration = (System.nanoTime - start) / 1e9d
val throughput = count / duration
kamonThroughputGauge.withTag("name", metricName).update(throughput)
count = 0
start = System.nanoTime
}
}
override def shape: FlowShape[T, T] = FlowShape[T, T](in, out)
}
Run Code Online (Sandbox Code Playgroud)
然后我创建了一个简单的流,使用 来MonitorProcessingTimerFlow
导出指标:
implicit val system = ActorSystem("FirstStreamMonitoring")
val source = Source(Stream.from(1)).throttle(1, 1 second)
/** Simulating workload fluctuation: A Flow that expand the event to a random number of multiple events */
val flow = Flow[Int].extrapolate { element =>
Stream.continually(Random.nextInt(100)).take(Random.nextInt(100)).iterator
}
val monitorFlow = Flow.fromGraph(new MonitorProcessingTimerFlow[Int](5 seconds)("monitorFlow"))
val sink = Sink.foreach[Int](println)
val graph = source
.via(flow)
.via(monitorFlow)
.to(sink)
graph.run()
Run Code Online (Sandbox Code Playgroud)
在以下位置进行正确配置application.conf
:
kamon.instrumentation.akka.filters {
actors.track {
includes = [ "FirstStreamMonitoring/user/*" ]
}
}
Run Code Online (Sandbox Code Playgroud)
我可以在普罗米修斯控制台上看到名称为的吞吐量指标name="monitorFlow"
:
归档时间: |
|
查看次数: |
543 次 |
最近记录: |