pla*_*bre 7 scala apache-spark spark-streaming
我正在尝试对从Kafka读取的(假)apache web服务器日志运行有状态Spark Streaming计算.目标是"会话化"类似于此博客文章的网络流量
唯一的区别是我希望"会话化"IP命中的每个页面,而不是整个会话.我能够在批处理模式下使用Spark从假网络流量文件中读取此内容,但现在我想在流式上下文中执行此操作.
日志文件从Kafka读取并解析为K/V成对(String, (String, Long, Long))或
(IP, (requestPage, time, time)).
然后我打电话groupByKey()给这个K/V pair.在批处理模式下,这将产生:
(String, CollectionBuffer((String, Long, Long), ...) 要么
(IP, CollectionBuffer((requestPage, time, time), ...)
在StreamingContext中,它产生一个:
(String, ArrayBuffer((String, Long, Long), ...) 像这样:
(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
Run Code Online (Sandbox Code Playgroud)
但是,随着下一个微分类(DStream)的到来,该信息被丢弃.
最终我想要的是ArrayBuffer随着时间的推移填充,因为给定的IP继续交互并对其数据运行一些计算以"会话化"页面时间.
我认为实现这一目标的运营商是" updateStateByKey." 我在使用这个操作符时遇到了一些麻烦(我是Spark和Scala的新手);
任何帮助表示赞赏.
迄今:
val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
def updateGroupByKey(
a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
b: Option[(String, ArrayBuffer[(String, Long, Long)])]
): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
}
Run Code Online (Sandbox Code Playgroud)
加博尔的答案让我开始走上正确的道路,但这是一个产生预期输出的答案。
首先,对于我想要的输出:
(100.40.49.235,List((/,1418934075000,1418934075000), (/,1418934105000,1418934105000), (/contactus.html,1418934174000,1418934174000)))
Run Code Online (Sandbox Code Playgroud)
我不需要groupByKey()。updateStateByKey已经将值累积到 Seq 中,因此添加groupByKey是不必要的(并且昂贵)。Spark 用户强烈建议不要使用groupByKey.
这是有效的代码:
def updateValues( newValues: Seq[(String, Long, Long)],
currentValue: Option[Seq[ (String, Long, Long)]]
): Option[Seq[(String, Long, Long)]] = {
Some(currentValue.getOrElse(Seq.empty) ++ newValues)
}
val grouped = ipTimeStamp.updateStateByKey(updateValues)
Run Code Online (Sandbox Code Playgroud)
这里updateStateByKey传递了一个函数 (updateValues),该函数具有随时间累积的值 (newValues) 以及流中当前值的选项 (currentValue)。然后它返回这些的组合。getOrElse是必需的,因为 currentValue 有时可能为空。请访问https://twitter.com/granturing获取正确的代码。
| 归档时间: |
|
| 查看次数: |
2885 次 |
| 最近记录: |