小编pla*_*bre的帖子

Spark Streaming groupByKey和updateStateByKey实现

我正在尝试对从Kafka读取的(假)apache web服务器日志运行有状态Spark Streaming计算.目标是"会话化"类似于此博客文章的网络流量

唯一的区别是我希望"会话化"IP命中的每个页面,而不是整个会话.我能够在批处理模式下使用Spark从假网络流量文件中读取此内容,但现在我想在流式上下文中执行此操作.

日志文件从Kafka读取并解析为K/V成对(String, (String, Long, Long))

(IP, (requestPage, time, time)).

然后我打电话groupByKey()给这个K/V pair.在批处理模式下,这将产生:

(String, CollectionBuffer((String, Long, Long), ...) 要么

(IP, CollectionBuffer((requestPage, time, time), ...)

在StreamingContext中,它产生一个:

(String, ArrayBuffer((String, Long, Long), ...) 像这样:

(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
Run Code Online (Sandbox Code Playgroud)

但是,随着下一个微分类(DStream)的到来,该信息被丢弃.

最终我想要的是ArrayBuffer随着时间的推移填充,因为给定的IP继续交互并对其数据运行一些计算以"会话化"页面时间.

我认为实现这一目标的运营商是" updateStateByKey." 我在使用这个操作符时遇到了一些麻烦(我是Spark和Scala的新手);

任何帮助表示赞赏.

迄今:

val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) 


    def updateGroupByKey(
                          a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
                          b: Option[(String, ArrayBuffer[(String, Long, Long)])]
                          ): Option[(String, ArrayBuffer[(String, Long, Long)])] = {

  }
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-streaming

7
推荐指数
1
解决办法
2885
查看次数

如何防止Cassandra提交日志填满磁盘空间

我正在AWS上运行双节点Datastax AMI集群.昨天,卡桑德拉开始拒绝一切的联系.系统日志没有显示.经过大量的修补,我发现提交日志已经填满了分配的挂载上的所有磁盘空间,这似乎导致连接拒绝(删除了一些提交日志,重新启动并且能够连接).

我在使用DataStax AMI 2.5.1和Cassandra 2.1.7

如果我决定从头开始擦除并重新启动所有内容,我该如何确保不再发生这种情况?

cassandra datastax-java-driver datastax cassandra-2.1

7
推荐指数
1
解决办法
1万
查看次数

play 2.6自动重载在第一次编译后没有显示更改

当我执行sbt run并刷新时,localhost:9000将显示我的更改.如果我改变别的东西,保存我的项目并再次点击刷新localhost:9000不会显示我的更改.如果我尝试sbt ~run我可以看到sbt重新编译每次我更改文件并保存,但刷新localhost:9000不显示任何更新.只有当我终止sbt进程并重新启动它时,才能看到我的更改.

我的设置:

Windows 8.1

玩2.6

sbt 1.0.2

斯卡拉2.12.3

Intellij 2017.2.5

scala sbt playframework playframework-2.6

5
推荐指数
1
解决办法
568
查看次数

如何使用DataStax QueryBuilder向集合中添加元素?

我有一个列类型为的表

text, bigint, set<text> 
Run Code Online (Sandbox Code Playgroud)

我正在尝试更新单行并使用QueryBuilder向集合添加元素.

覆盖现有集的代码如下所示(注意这是scala):

val query = QueryBuilder.update("twitter", "tweets")
  .`with`(QueryBuilder.set("sinceid", update.sinceID))
  .and(QueryBuilder.set("tweets", setAsJavaSet(update.tweets)))
  .where(QueryBuilder.eq("handle", update.handle))
Run Code Online (Sandbox Code Playgroud)

我能够找到实际的CQL来为一个集合添加一个元素:

UPDATE users
SET emails = emails + {'fb@friendsofmordor.org'} WHERE user_id = 'frodo';
Run Code Online (Sandbox Code Playgroud)

但是找不到使用QueryBuilder的例子.

基于CQL,我也尝试过:

  .and(QueryBuilder.set("tweets", "tweets"+{setAsJavaSet(update.tweets)}))
Run Code Online (Sandbox Code Playgroud)

但它没有用.提前致谢

scala cql cql3 datastax-java-driver datastax

3
推荐指数
1
解决办法
1064
查看次数

Datastax java驱动程序,将scala集合转换为java错误

我正在尝试将Scala Map(我正在尝试转换为java.util.Map)存储到cassandra 2.1.8中.

数据结构如下所示:

Map[String -> Set[Tuple[String, String, String]]]
Run Code Online (Sandbox Code Playgroud)

我创建了如下表:

CREATE TABLE mailing (emailaddr text PRIMARY KEY, totalmails bigint, emails map<text, frozen<set<tuple<text, text, text>>>>);
Run Code Online (Sandbox Code Playgroud)

我首先尝试将Set转换为java Set:

def emailsToCassandra(addr: emailAddress, mail: MailContent, number: Int) = {
println("Inserting emails into cassandra")

mail.emails.foreach(result =>

  setAsJavaSet(result._2)
)
Run Code Online (Sandbox Code Playgroud)

然后我构建查询并尝试将Map转换为java Map:

val query = QueryBuilder.insertInto("emails", "mailing")
                        .value("emailAddr", addr.toString())
                        .value("totalmails", number)
                        .value("emails", mapAsJavaMap(mail.emails))
session.executeAsync(query)
Run Code Online (Sandbox Code Playgroud)

我回来了:

java.lang.IllegalArgumentException: Value 1 of type class scala.collection.convert.Wrappers$MapWrapper does not correspond to any CQL3 type
Run Code Online (Sandbox Code Playgroud)

我也试过这样做:

val lol = mail.emails.asInstanceOf[java.util.Map[String, java.util.Set[Tuple3[String, String, String]]]] …
Run Code Online (Sandbox Code Playgroud)

scala cassandra datastax-java-driver

3
推荐指数
1
解决办法
2932
查看次数