Soi*_*oid 3 scala cassandra apache-spark rdd
我想根据我在RDD中的值来查询Cassandra的一些数据.我的方法如下:
val userIds = sc.textFile("/tmp/user_ids").keyBy( e => e )
val t = sc.cassandraTable("keyspace", "users").select("userid", "user_name")
val userNames = userIds.flatMap { userId =>
t.where("userid = ?", userId).take(1)
}
userNames.take(1)
Run Code Online (Sandbox Code Playgroud)
虽然Cassandra查询在Spark shell中有效,但是当我在flatMap中使用它时会引发异常:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NullPointerException:
org.apache.spark.rdd.RDD.<init>(RDD.scala:125)
com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:49)
com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83)
com.datastax.spark.connector.rdd.CassandraRDD.where(CassandraRDD.scala:94)
Run Code Online (Sandbox Code Playgroud)
我的理解是我无法在另一个RDD中生成RDD(Cassandra结果).
我在网上找到的例子在RDD中读取整个Cassandra表并加入RDD(如下所示:https://cassandrastuff.wordpress.com/2014/07/07/cassandra-and-spark-table-joins/ ).但如果Cassandra表很大,它就无法扩展.
但是我该如何处理这个问题呢?
Spark 1.2介绍 joinWithCassandraTable
val userids = sc.textFile("file:///Users/russellspitzer/users.list")
userids
.map(Tuple1(_))
.joinWithCassandraTable("keyspace","table")
Run Code Online (Sandbox Code Playgroud)
此代码最终将执行与下面的解决方案相同的工作.该joinWithCassandraTable
方法将使用与saveToCassandra
用于将类转换为Cassandra可以理解的内容相同的代码
.这就是为什么我们需要一个元组而不仅仅是一个简单的字符串来执行连接.
我认为你真正想要做的是在两个数据源上进行内连接.这应该比flatmap方法更快,并且有一些内部智能散列.
scala> val userids = sc.textFile("file:///Users/russellspitzer/users.list")
scala> userids.take(5)
res19: Array[String] = Array(3, 2)
scala> sc.cassandraTable("test","users").collect
res20: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{userid: 3, username: Jacek}, CassandraRow{userid: 1, username: Russ}, CassandraRow{userid: 2, username: Helena})
scala> userids.map(line => (line.toInt,true)).join(sc.cassandraTable("test","users").map(row => (row.getInt("userid"),row.getString("username")))).collect
res18: Array[(Int, (Boolean, String))] = Array((2,(true,Helena)), (3,(true,Jacek)))
Run Code Online (Sandbox Code Playgroud)
如果您实际上只想对C*数据库执行一堆主键查询,那么最好只使用普通的驱动程序路径执行它们而不使用spark.
import com.datastax.spark.connector.cql.CassandraConnector
import collection.JavaConversions._
val cc = CassandraConnector(sc.getConf)
val select = s"SELECT * FROM cctest.users where userid=?"
val ids = sc.parallelize(1 to 10)
ids.flatMap(id =>
cc.withSessionDo(session =>
session.execute(select, id.toInt: java.lang.Integer).iterator.toList.map(row =>
(row.getInt("userid"), row.getString("username"))))).collect
Run Code Online (Sandbox Code Playgroud)