从Spark中的cassandra表中删除

Ami*_*IFI 4 scala cassandra-2.0 apache-spark

我正在使用Spark和cassandra.我正在从我的表中读取一些行,以便使用PrimaryKey删除主题.这是我的代码:

val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
  select("a","b","c","d").
  where("d=?", d).cache()

lines.foreach(r => {
    val session: Session = connector.openSession
    val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
    session.execute(delete)
    session.close()
})
Run Code Online (Sandbox Code Playgroud)

但是这种方法为每一行创建一个会话,这需要很多时间.那么是否可以使用sc.CassandraTable或其他更好的方法删除我的行.

谢谢

maa*_*asg 8

我认为目前deleteCassandra Connector上没有支持.要分摊连接设置的成本,建议的方法是将操作应用于每个分区.

所以你的代码看起来像这样:

lines.foreachPartition(partition => {
    val session: Session = connector.openSession //once per partition
    partition.foreach{elem => 
        val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where     channel='"+elem._1 +"' and ctid='"+elem._2+"'and cvid='"+elem._3+"';"
        session.execute(delete)
    }
    session.close()
})
Run Code Online (Sandbox Code Playgroud)

您还可以考虑使用DELETE FROM ... WHERE pk IN (list)并使用类似的方法来list为每个分区构建.这将更加高效,但可能会破坏非常大的分区,因为列表将变得非常长.在应用此功能之前重新分区目标RDD将有所帮助.