java.lang.UnsupportedOperationException:'不允许写入非空的Cassandra表

Moh*_*ana 5 cassandra datastax-enterprise apache-spark spark-streaming apache-spark-sql

我有一个场景,我将接收由我的火花流程序处理的流数据,并且每个间隔的输出将附加到我现有的cassandra表中.

目前我的火花流程序将生成一个数据框,我需要保存在我的cassandra表中.我目前面临的问题是当我使用下面的命令时,我无法将数据/行附加到我现有的cassandra表中

dff.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "xxx", "yyy" -> "retail")).save()
Run Code Online (Sandbox Code Playgroud)

我已阅读以下链接http://rustyrazorblade.com/2015/08/migrating-from-mysql-to-cassandra-using-spark/,他将mode ="append"传递给save方法但其抛出语法错误

此外,我还能够从以下链接了解我需要修复的位置 https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/rlGGWQF2wnM

如何解决这个问题需要帮助.我正在scala中编写我的spark流媒体作业

Nie*_*and 8

我想你必须按照以下方式做到:

dff.write.format("org.apache.spark.sql.cassandra").mode(SaveMode.Append).options(Map("table" -> "xxx", "yyy" -> "retail")).save()
Run Code Online (Sandbox Code Playgroud)

该方法卡桑德拉处理数据迫使你做所谓的"upserts" - 你要记住,插入可能会覆盖一些地方已经存储记录的主键是一样的插入reccord的主键列.Cassandra是一个"快速写入"的数据库,因此它不会在写入之前检查数据是否存在.