Pra*_*ain 6 sql apache-spark spark-streaming
我试图通过spark中的流数据运行SQL查询.这看起来非常简单,但是当我尝试它时,我得到错误表:tablename >.它无法找到我已注册的表格.
使用Spark SQL和批处理数据工作正常,所以我认为它与我如何调用streamingcontext.start()有关.有什么想法是什么问题?这是代码:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
object Streaming {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
val sc = new SparkContext(sparkConf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(2))
val sqc = new SQLContext(sc);
import sqc.createSchemaRDD
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt")
lines.foreachRDD(rdd=>rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data"))
// lines.foreachRDD(rdd=>rdd.foreach(println))
val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
ssc.start()
ssc.awaitTermination()
}
}
Run Code Online (Sandbox Code Playgroud)
欢迎任何建议.谢谢.
Pra*_*ain 11
好吧,我知道了这个问题.您必须在foreachRDD函数中查询数据,否则无法识别该表.像这样的东西有效:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
object Mlist {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
val sc = new SparkContext(sparkConf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt")
lines.foreachRDD(rdd=>rdd.foreach(println))
val sqc = new SQLContext(sc);
import sqc.createSchemaRDD
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
lines.foreachRDD(rdd=>{
rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
teenagers.foreach(println)
})
ssc.start()
ssc.awaitTermination()
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4421 次 |
| 最近记录: |