相关疑难解决方法(0)

使用Spark Streaming从Cassandra读取

当我使用火花流从Cassandra读取时,我遇到了问题.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

作为上面的链接,我使用

val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)
Run Code Online (Sandbox Code Playgroud)

从cassandra中选择数据,但似乎火花流只有一次查询,但我希望它继续使用10秒的间隔进行查询.

我的代码如下,希望您的回复.

谢谢!

import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue


object SimpleApp {
def main(args: Array[String]){
    val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")

    val ssc = new StreamingContext(conf, Seconds(10))

    val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")

    //rdd.collect().foreach(println)

    val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()


    val dstream = ssc.queueStream(rddQueue)

    dstream.print()

    ssc.start()
    rdd.collect().foreach(println)
    rddQueue += rdd
    ssc.awaitTermination()
}  
Run Code Online (Sandbox Code Playgroud)

}

scala spark-streaming spark-cassandra-connector

8
推荐指数
1
解决办法
8430
查看次数