如何将数据集写入Cassandra?

use*_*174 8 apache-spark pyspark spark-cassandra-connector spark-structured-streaming

所以我有一个Python Stream-sourced DataFrame df,它包含我想要放入带有spark-cassandra-connector的Cassandra表的所有数据.我试过两种方式:

df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode('append') \
    .options(table="myTable",keyspace="myKeySpace") \
    .save() 

query = df.writeStream \
    .format("org.apache.spark.sql.cassandra") \
    .outputMode('append') \
    .options(table="myTable",keyspace="myKeySpace") \
    .start()

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

但是我继续分别得到这个错误:

pyspark.sql.utils.AnalysisException: "'write' can not be called on streaming Dataset/DataFrame;
Run Code Online (Sandbox Code Playgroud)

java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.cassandra does not support streamed writing.
Run Code Online (Sandbox Code Playgroud)

无论如何我可以将我的Streaming DataFrame发送到我的Cassandra表中吗?

Rus*_*ssS 6

目前Sink在Spark Cassandra Connector中没有Cassandra的流媒体.您需要实现自己的Sink或等待它可用.

如果您使用的是Scala或Java,则可以使用foreach运算符并使用" 使用Foreach"中ForeachWriter所述的方法.

  • 今天仍然如此(我的意思是2018年)? (4认同)
  • 不,没有转换(至少没有我知道的) (2认同)

小智 5

我知道它是一个旧帖子,更新它以备将来参考。

您可以从流数据中批量处理它。像下面

def writeToCassandra(writeDF, epochId):
 writeDF.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="table_name", keyspace="keyspacename")\
    .mode("append") \
    .save()

query = sdf3.writeStream \
.trigger(processingTime="10 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()
Run Code Online (Sandbox Code Playgroud)