Spark joinWithCassandraTable()在地图上有多个分区键ERROR

Res*_*hef 6 scala cassandra datastax-enterprise apache-spark

我试图通过使用以下方法过滤大型Cassandra表的一小部分:

val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")
Run Code Online (Sandbox Code Playgroud)

我想映射cassandra表中作为分区键的一部分的'created'列中的行.

我的表键(表的分区键)定义为:

case class TableKey(imei: String, created: Long, when: Long)
Run Code Online (Sandbox Code Playgroud)

结果是错误:

[error] /home/ubuntu/scala/test/test.scala:61:没有足够的方法适用于方法:( imei:String,created:Long)test.TableKey in object TableKey.[error]已创建未指定的值参数.[error] val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")[error] ^ [error]发现一个错误[error](编译:编译) )编译失败

它只与文档中的分区键中的一个对象一起使用.

为什么多分区密钥有问题? - 已回答.

编辑:我试图以正确的形式使用joinWithCassandraTable:

val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey("*",_,startDate)).joinWithCassandraTable("listener","snapshots_test_c")
Run Code Online (Sandbox Code Playgroud)

当我试图在Spark上运行时没有错误,但它永远停留在"[阶段0:>(0 + 2)/ 2]"

出了什么问题?

Rus*_*ssS 5

该错误告诉您该类TableKey需要3个组件进行初始化,但只传递了一个参数.这是Scala编译错误,与C*或Spark无关.

 val snapshotsFiltered = sc.parallelize(startDate to endDate)
   .map(TableKey(_2))  /// Table Key does not have a single element constructor so this will fail
   .joinWithCassandraTable("listener","snapshots_test_b")
Run Code Online (Sandbox Code Playgroud)

通常,C*使用整个partition keydo来确定特定行的位置.因此,如果您知道整个数据,则只能有效地提取数据,partition key因此只传递一部分数据没有任何价值.

joinWithCassandraTable需要完整的partition key值,因此它可以有效地完成它的工作.如果您只有一部分,则parition key需要执行全表扫描并使用Spark进行过滤.

如果您只想基于a来过滤,clustering column可以通过将一个where子句下推到C*来实现

sc.cassandraTable("ks","test").where("clustering_key > someValue")
Run Code Online (Sandbox Code Playgroud)