我正在尝试对从Kafka读取的(假)apache web服务器日志运行有状态Spark Streaming计算.目标是"会话化"类似于此博客文章的网络流量
唯一的区别是我希望"会话化"IP命中的每个页面,而不是整个会话.我能够在批处理模式下使用Spark从假网络流量文件中读取此内容,但现在我想在流式上下文中执行此操作.
日志文件从Kafka读取并解析为K/V
成对(String, (String, Long, Long))
或
(IP, (requestPage, time, time))
.
然后我打电话groupByKey()
给这个K/V pair
.在批处理模式下,这将产生:
(String, CollectionBuffer((String, Long, Long), ...)
要么
(IP, CollectionBuffer((requestPage, time, time), ...)
在StreamingContext中,它产生一个:
(String, ArrayBuffer((String, Long, Long), ...)
像这样:
(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
Run Code Online (Sandbox Code Playgroud)
但是,随着下一个微分类(DStream)的到来,该信息被丢弃.
最终我想要的是ArrayBuffer
随着时间的推移填充,因为给定的IP继续交互并对其数据运行一些计算以"会话化"页面时间.
我认为实现这一目标的运营商是" updateStateByKey
." 我在使用这个操作符时遇到了一些麻烦(我是Spark和Scala的新手);
任何帮助表示赞赏.
迄今:
val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
def updateGroupByKey(
a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
b: Option[(String, ArrayBuffer[(String, Long, Long)])]
): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
}
Run Code Online (Sandbox Code Playgroud) 我正在AWS上运行双节点Datastax AMI集群.昨天,卡桑德拉开始拒绝一切的联系.系统日志没有显示.经过大量的修补,我发现提交日志已经填满了分配的挂载上的所有磁盘空间,这似乎导致连接拒绝(删除了一些提交日志,重新启动并且能够连接).
我在使用DataStax AMI 2.5.1和Cassandra 2.1.7
如果我决定从头开始擦除并重新启动所有内容,我该如何确保不再发生这种情况?
当我执行sbt run
并刷新时,localhost:9000
将显示我的更改.如果我改变别的东西,保存我的项目并再次点击刷新localhost:9000
不会显示我的更改.如果我尝试sbt ~run
我可以看到sbt重新编译每次我更改文件并保存,但刷新localhost:9000
不显示任何更新.只有当我终止sbt进程并重新启动它时,才能看到我的更改.
我的设置:
Windows 8.1
玩2.6
sbt 1.0.2
斯卡拉2.12.3
Intellij 2017.2.5
我有一个列类型为的表
text, bigint, set<text>
Run Code Online (Sandbox Code Playgroud)
我正在尝试更新单行并使用QueryBuilder向集合添加元素.
覆盖现有集的代码如下所示(注意这是scala):
val query = QueryBuilder.update("twitter", "tweets")
.`with`(QueryBuilder.set("sinceid", update.sinceID))
.and(QueryBuilder.set("tweets", setAsJavaSet(update.tweets)))
.where(QueryBuilder.eq("handle", update.handle))
Run Code Online (Sandbox Code Playgroud)
我能够找到实际的CQL来为一个集合添加一个元素:
UPDATE users
SET emails = emails + {'fb@friendsofmordor.org'} WHERE user_id = 'frodo';
Run Code Online (Sandbox Code Playgroud)
但是找不到使用QueryBuilder的例子.
基于CQL,我也尝试过:
.and(QueryBuilder.set("tweets", "tweets"+{setAsJavaSet(update.tweets)}))
Run Code Online (Sandbox Code Playgroud)
但它没有用.提前致谢
我正在尝试将Scala Map(我正在尝试转换为java.util.Map)存储到cassandra 2.1.8中.
数据结构如下所示:
Map[String -> Set[Tuple[String, String, String]]]
Run Code Online (Sandbox Code Playgroud)
我创建了如下表:
CREATE TABLE mailing (emailaddr text PRIMARY KEY, totalmails bigint, emails map<text, frozen<set<tuple<text, text, text>>>>);
Run Code Online (Sandbox Code Playgroud)
我首先尝试将Set转换为java Set:
def emailsToCassandra(addr: emailAddress, mail: MailContent, number: Int) = {
println("Inserting emails into cassandra")
mail.emails.foreach(result =>
setAsJavaSet(result._2)
)
Run Code Online (Sandbox Code Playgroud)
然后我构建查询并尝试将Map转换为java Map:
val query = QueryBuilder.insertInto("emails", "mailing")
.value("emailAddr", addr.toString())
.value("totalmails", number)
.value("emails", mapAsJavaMap(mail.emails))
session.executeAsync(query)
Run Code Online (Sandbox Code Playgroud)
我回来了:
java.lang.IllegalArgumentException: Value 1 of type class scala.collection.convert.Wrappers$MapWrapper does not correspond to any CQL3 type
Run Code Online (Sandbox Code Playgroud)
我也试过这样做:
val lol = mail.emails.asInstanceOf[java.util.Map[String, java.util.Set[Tuple3[String, String, String]]]] …
Run Code Online (Sandbox Code Playgroud)