使用 Elasticsearch 实时分析事件日志

use*_*736 5 hadoop machine-learning elasticsearch apache-spark lambda-architecture

每次更改某些设备的属性时,我都会收集事件日志。为此,我决定使用:

  1. Logstash - 我的代理 IoT 应用程序将日志以 JSON 格式发送到其中,
  2. Elasticsearch - 用于存储数据(日志),
  3. Kibana - 用于数据可视化。

带有日志的 JSON 正在定期发送,其形式如下:

{"deviceEventLogs":[{"date":"16:16:39 31-08-2016","locationName":"default","property":"on","device":"Lamp 1","value":"
false","roomName":"LivingRoom"}, ... ,]}
Run Code Online (Sandbox Code Playgroud)

Elasticsearch 中的单个事件条目示例如下所示:

 {
            "_index": "logstash-2016.08.25",
            "_type": "on",
            "_id": "AVbDYQPq54WlAl_UD_yg",
            "_score": 1,
            "_source": {
               "@version": "1",
               "@timestamp": "2016-08-25T20:25:28.750Z",
               "host": "127.0.0.1",
               "headers": {
                  "request_method": "PUT",
                  "request_path": "/deviceEventLogs",
                  "request_uri": "/deviceEventLogs",
                  "http_version": "HTTP/1.1",
                  "content_type": "application/json",
                  "http_user_agent": "Java/1.8.0_91",
                  "http_host": "127.0.0.1:31311",
                  "http_accept": "text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2",
                  "http_connection": "keep-alive",
                  "content_length": "34861"
               },
               "date": "2016-08-08T14:48:11.000Z",
               "device": "Lamp 1",
               "property": "on",
               "locationName": "default",
               "roomName": "LivingRoom",
               "value_boolean": true
            }
 }
Run Code Online (Sandbox Code Playgroud)

我的目标是创建一个带有某种仪表板的网站,在合理的时间内显示分析的数据(几分钟应该是可以接受的),即:

  • 显示能源消耗的历史并预测特征中的消耗
  • 检测能源消耗或其他因素(如灯光或加热使用情况)的异常情况
  • 显示基于某种不复杂的统计数据的建议,即“您可以将给定的设备从位置 1 移动到位置 2,因为那里更需要它(比其他地方使用更频繁)”等。

虽然最后一点非常简单——我可以在 Elasticsearch 中使用简单的查询或聚合,然后将其与某个阈值进行比较,但前两点需要深入分析,如机器学习或数据挖掘。

目前,该系统配备了大约 50 个设备,平均每 10 秒更新一次其状态。未来设备的数量可以增加到 50 000。假设一个事件日志有 100 字节,它可以导致 Elasticsearch 每年大约 15 TB 的数据。

一般的问题是 - 这种系统的合理解决方案/技术/架构是什么?

  1. 将我的所有日​​志存储在 Elasticsearch 中是否是一个合理的开始?
  2. 我认为 es-hadoop 库将 Elasticsearch 与 Apache Spark 一起使用,以便能够在 Spark 中使用 Mlib 处理我的数据——这是一个合理的方向吗?
  3. 我可以只使用 Elasticsearch 来存储我的所有数据并只使用 Spark 和 Mlib 来提供深入分析,还是应该考虑实施所谓的“Lambda 架构”,将 Elasticsearch 视为速度层?我对使用 Kafka 和 Apache Storm 的各种配置有一些了解,但我不确定我是否需要它。由于该项目应该在一个月内完成,而且我是初学者,我担心复杂性以及执行此类所需的时间。
  4. 如果数据负载小 10 倍(每年大约 1.5 TB)怎么办 - 您的答案是否相同?

小智 1

这是一个非常复杂的问题,让我试着分解一下:

你应该思考的问题

  • 您的数据可用于查询的端到端延迟是多少?您需要实时还是可以接受延迟?
  • 您愿意容忍的数据丢失是多少?
  • 您正在查看的分析/机器学习算法的准确性是多少?您需要高度准确的结果还是可以接受一些不准确的结果?
  • 您是否只需要完整的结果,还是需要某种推测性结果?

这些问题以及空间限制和数据负载增加时的延迟等常规问题应该可以帮助您确定正确的解决方案。

一般来说,这些问题可以看作是摄取->处理->呈现。

摄取 - 需要消息总线

一般来说,人们选择像 Kafka 这样的消息总线来处理来自慢速下游消费者的背压,并提供可靠性(通过持久化到磁盘)以防止数据丢失。Kafka 在 Spark Streaming、Druid firehose 支持、ES 插件等集成方面也拥有良好的社区支持。

处理 - 需要可扩展的计算层

您需要在此处决定实时与批处理、适用的数据丢失、准确与推测结果等。请阅读 Tyler Akidau 关于流式传输的文章:https: //www.oreilly.com/ideas/the -world-beyond-batch-streaming-101了解详细说明。

人们选择 Spark 流用于实时用例,而简单的 M/R 作业应该可以满足批处理作业的需要。如果您计划进行流式作业,那么事件的窗口和会话可能会使事情变得更加复杂。

演示 - 需要交互式查询和快速响应

这是前端应用程序将要集成的地方,选择一个非常适合预期查询类型和所需响应准确性的工具是有意义的。

像 ES 这样的工具在搜索、过滤和分面方面表现得非常好,但在需要复杂的数学聚合时就会失败。AFAIK ES 不像 Druid 那样支持 HyperLogLog 等概率结构。

改造

现在您必须将您的要求与上面的每一层对应起来。

显示能源消耗历史并预测特征中的消耗

检测能源消耗或其他因素(例如灯光或供暖使用)的异常情况

正如您所提到的,您显然需要机器学习库。Spark 及其 MLib 支持非常棒。

显示基于某种不成熟统计数据的推荐,即“您可以将给定设备从位置 1 移动到位置 2,因为那里更需要它(比其他地方更频繁地使用)”等。

您甚至可以使用 Spark 上的 MLib 来执行此操作,并将建议泵入 ES 中的单独索引甚至 Kafka 主题,您可以进一步将其简化为 HDFS 或 ES。您应该小心这里的垃圾收集,因为这可能会导致数据爆炸,并且您需要在此处积极保留。此外,预先计算建议可以帮助您执行诸如警报、推送通知之类的反应性操作,甚至从 UI 进行查询也会更快。

假设一个事件日志有 100 字节,每年 Elasticsearch 中会产生大约 15 TB 的数据。

这些都是任何存储系统配置时的常见问题。您可以通过计算历史数据的物化视图来进行优化,但您可以稍后再做决定,因为这可能会导致过早的优化。您最好首先测量查询的存储和延迟,然后对容量进行追溯分析。

将我的所有日​​志存储在 Elasticsearch 中是否是一个合理的开始?

考虑到您的用例,非常如此。但如果使用 Spark 流/MLib 或批处理 MR 作业,那么您甚至可以使用哑数据存储,因为大多数计算都是预先发生的。

我认为 es-hadoop 库可以使用 Elasticsearch 和 Apache Spark,以便能够在 Spark 中使用 Mlib 处理我的数据 - 这是一个合理的方向吗?

看起来您已经决定进行批处理,在这种情况下您可以使用标准 MR 或 Spark Ba​​tch 以及 MLib。如果你需要实时性,你需要像 Kafka 这样的东西并使用 Spark Streaming。如果您可以接受数据丢失,那么您可以积极保留数据,甚至在 Spark 中,当您决定窗口/滑动间隔等时。如果您可以接受结果不准确,您可以使用概率数据结构(例如Bloom) filter、hyperloglog - druid 支持这个)来表示结果。

我可以仅使用 Elasticsearch 来存储所有数据并仅使用 Spark 和 Mlib 来提供深入分析,还是应该考虑实施所谓的“Lambda 架构”,将 Elasticsearch 视为速度层?

我不确定您是否可以将数据从 ES 流式传输到 Spark 作业。lambda 架构被过度宣传,只有在您确定实时层不准确并且无法处理数据丢失/不准确的情况下才有帮助。否则,一个简单的 Spark Streaming 作业从 Kafka 读取数据并将数据泵送到 ES 应该就足够了。在决定使用 Lambda 等复杂架构之前,请考虑衡量数据丢失,因为运营成本(例如重复代码、需要维护的更多基础设施等)可能很高。

如果数据负载减少 10 倍(每年约 1.5 TB)怎么办 - 您的答案会相同吗?

我仍然更喜欢相同的架构 - Kafka+Spark Streaming(MLib)+ES/Druid - 这更容易实现,也更容易维护。