当我使用火花流从Cassandra读取时,我遇到了问题.
作为上面的链接,我使用
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)
Run Code Online (Sandbox Code Playgroud)
从cassandra中选择数据,但似乎火花流只有一次查询,但我希望它继续使用10秒的间隔进行查询.
我的代码如下,希望您的回复.
谢谢!
import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue
object SimpleApp {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")
val ssc = new StreamingContext(conf, Seconds(10))
val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
//rdd.collect().foreach(println)
val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()
val dstream = ssc.queueStream(rddQueue)
dstream.print()
ssc.start()
rdd.collect().foreach(println)
rddQueue += rdd
ssc.awaitTermination()
}
Run Code Online (Sandbox Code Playgroud)
}