Ami*_*IFI 4 scala cassandra-2.0 apache-spark
我正在使用Spark和cassandra.我正在从我的表中读取一些行,以便使用PrimaryKey删除主题.这是我的代码:
val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
select("a","b","c","d").
where("d=?", d).cache()
lines.foreach(r => {
val session: Session = connector.openSession
val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
session.execute(delete)
session.close()
})
Run Code Online (Sandbox Code Playgroud)
但是这种方法为每一行创建一个会话,这需要很多时间.那么是否可以使用sc.CassandraTable或其他更好的方法删除我的行.
谢谢
我认为目前deleteCassandra Connector上没有支持.要分摊连接设置的成本,建议的方法是将操作应用于每个分区.
所以你的代码看起来像这样:
lines.foreachPartition(partition => {
val session: Session = connector.openSession //once per partition
partition.foreach{elem =>
val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+elem._1 +"' and ctid='"+elem._2+"'and cvid='"+elem._3+"';"
session.execute(delete)
}
session.close()
})
Run Code Online (Sandbox Code Playgroud)
您还可以考虑使用DELETE FROM ... WHERE pk IN (list)并使用类似的方法来list为每个分区构建.这将更加高效,但可能会破坏非常大的分区,因为列表将变得非常长.在应用此功能之前重新分区目标RDD将有所帮助.
| 归档时间: |
|
| 查看次数: |
5832 次 |
| 最近记录: |