Res*_*hef 6 scala cassandra datastax-enterprise apache-spark
我试图通过使用以下方法过滤大型Cassandra表的一小部分:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")
Run Code Online (Sandbox Code Playgroud)
我想映射cassandra表中作为分区键的一部分的'created'列中的行.
我的表键(表的分区键)定义为:
case class TableKey(imei: String, created: Long, when: Long)
Run Code Online (Sandbox Code Playgroud)
结果是错误:
[error] /home/ubuntu/scala/test/test.scala:61:没有足够的方法适用于方法:( imei:String,created:Long)test.TableKey in object TableKey.[error]已创建未指定的值参数.[error] val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")[error] ^ [error]发现一个错误[error](编译:编译) )编译失败
它只与文档中的分区键中的一个对象一起使用.
为什么多分区密钥有问题? - 已回答.
编辑:我试图以正确的形式使用joinWithCassandraTable:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey("*",_,startDate)).joinWithCassandraTable("listener","snapshots_test_c")
Run Code Online (Sandbox Code Playgroud)
当我试图在Spark上运行时没有错误,但它永远停留在"[阶段0:>(0 + 2)/ 2]"
出了什么问题?
该错误告诉您该类TableKey
需要3个组件进行初始化,但只传递了一个参数.这是Scala编译错误,与C*或Spark无关.
val snapshotsFiltered = sc.parallelize(startDate to endDate)
.map(TableKey(_2)) /// Table Key does not have a single element constructor so this will fail
.joinWithCassandraTable("listener","snapshots_test_b")
Run Code Online (Sandbox Code Playgroud)
通常,C*使用整个partition key
do来确定特定行的位置.因此,如果您知道整个数据,则只能有效地提取数据,partition key
因此只传递一部分数据没有任何价值.
joinWithCassandraTable需要完整的partition key
值,因此它可以有效地完成它的工作.如果您只有一部分,则parition key
需要执行全表扫描并使用Spark进行过滤.
如果您只想基于a来过滤,clustering column
可以通过将一个where
子句下推到C*来实现
sc.cassandraTable("ks","test").where("clustering_key > someValue")
Run Code Online (Sandbox Code Playgroud)