如何测量 Storm 拓扑中的延迟和吞吐量

use*_*674 5 java cloud performance clojure apache-storm

我正在通过示例学习 Storm ExclamationTopology。我想测量!!!一个bolt和吞吐量(比如,每秒有多少个单词通过bolt)的延迟(添加到一个单词中所需的时间)。

这里,我可以计算单词的数量和执行螺栓的次数:

_countMetric = new CountMetric();
_wordCountMetric = new MultiCountMetric();


context.registerMetric("execute_count", _countMetric, 5);
context.registerMetric("word_count", _wordCountMetric, 60);
Run Code Online (Sandbox Code Playgroud)

我知道 Storm UI 给出了Process Latency并且Execute Latency这篇文章很好地解释了它们是什么。

但是,我想记录每个 Bolt 每次执行的延迟,并使用此信息和word_count来计算吞吐量。

我如何使用Storm Metrics来实现这一点?

moo*_*d42 0

虽然你的问题很直接,并且肯定会引起很多人的兴趣,但它的答案并不像应有的那么微不足道。首先,我们需要澄清,我们到底想要衡量什么。吞吐量和延迟是很容易理解的术语,但在 Storms 分布式环境中事情变得更加复杂。

正如这篇优秀的博客文章中所描述的,每个 Storm 主管至少有 3 个线程来完成不同的任务。当Worker Receiver Thread等待传入的数据元组并将它们聚合为一个块时,它们将被发送到Worker Executor Thread。这包含用户逻辑(在您的情况下是ExclamationBolt一个发送者,负责处理传出消息。最后,在每个主管节点上,都有一个工作发送线程,用于聚合来自所有执行器的消息,聚合它们并将它们发送到网络。

当然,每个线程都有自己的延迟和吞吐量。对于发送者和接收者线程,它们很大程度上取决于您可以调整的缓冲区大小。在您的情况下,您只想测量一个(执行)bolt 的延迟和吞吐量 - 这是可能的,但请记住,其他线程对此 Bolt 有影响。

我的方法: 为了获得延迟和吞吐量,我使用了旧的Storm 内置指标。因为我发现文档不是很清楚,所以我在这里画一条线:我们没有使用新的Storm Metric API v2,也没有使用Cluster Metrics

  1. 通过将以下内容放入您的 中来激活风暴日志记录storm.yaml
topology.metrics.consumer.register:
  - class: "org.apache.storm.metric.LoggingMetricsConsumer"
    parallelism.hint: 1
Run Code Online (Sandbox Code Playgroud)
  1. 您可以通过以下方式设置报告间隔:topology.builtin.metrics.bucket.size.secs: 10

  2. 运行您的查询。所有指标每 10 秒都会记录在特定的指标日志文件中。找到这个日志文件并不简单。Storm 创建一个LoggingMetricsConsumer-Bolt 并将其分发到集群中。在此节点上,您应该在 Storm 日志中找到相应的指标文件。

  3. 该指标文件包含您正在寻找的每个执行器的指标,例如:complete-latency等等execute-latency。对于吞吐量,我将使用包含例如:的队列指标arrival_rate_secs来估计每秒插入的元组数量。处理每个主管上执行的多个线程。

祝你好运!