Pra*_*ain 11 apache-spark spark-streaming
这是通过Spark Streaming运行简单SQL查询的代码.
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
object StreamingSQL {
case class Persons(name: String, age: Int)
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
val sc = new SparkContext(sparkConf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people/")
lines.foreachRDD(rdd=>rdd.foreach(println))
val sqc = new SQLContext(sc);
import sqc.createSchemaRDD
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
lines.foreachRDD(rdd=>{
rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
teenagers.foreach(println)
})
ssc.start()
ssc.awaitTermination()
}
}
Run Code Online (Sandbox Code Playgroud)
如您所见,要通过流式传输SQL运行,必须在foreachRDD方法中进行查询.我想对从两个不同的流接收的数据运行SQL连接.有什么方法可以做到吗?
好吧,我想总结一下我们在Spiro的答案中讨论后得到的解决方法.他建议首先创建一个空表,然后将RDD插入其中.唯一的问题是Spark不允许插入表中.这是可以做的:
首先,创建一个RDD,它具有与您期望的流相同的模式:
import sqlContext.createSchemaRDD
val d1=sc.parallelize(Array(("a",10),("b",3))).map(e=>Rec(e._1,e._2))
Run Code Online (Sandbox Code Playgroud)
然后将其保存为Parquet文件
d1.saveAsParquetFile("/home/p1.parquet")
Run Code Online (Sandbox Code Playgroud)
现在,加载镶木地板文件并使用registerAsTable()方法将其注册为表.
val parquetFile = sqlContext.parquetFile("/home/p1.parquet")
parquetFile.registerAsTable("data")
Run Code Online (Sandbox Code Playgroud)
现在,当您收到流时,只需在流上应用foreachRDD()并使用insertInto()方法将各个RDD插入上面创建的表中
dStream.foreachRDD(rdd=>{
rdd.insertInto("data")
})
Run Code Online (Sandbox Code Playgroud)
这个insertInto()工作正常,允许将数据收集到表中.现在,您可以对任意数量的流执行相同操作,然后运行查询.
编写代码的方式,每次运行SQL查询时,最终都会产生一系列小的SchemaRDD.诀窍是将这些中的每一个保存到累积RDD或累积表.
首先,表格方法,使用insertInto:
对于每个流,首先创建一个您注册为表的emty RDD,获取一个空表.对于你的例子,假设你称之为"allTeenagers".
然后,对于每个查询,使用SchemaRDD的insertInto方法将结果添加到该表:
teenagers.insertInto("allTeenagers")
Run Code Online (Sandbox Code Playgroud)
如果对两个流执行此操作,创建两个单独的累积表,则可以使用普通的旧SQL查询来连接它们.
(注意:我实际上并没有能够让他上班,而且有点搜索让我怀疑其他人有,但我很确定我已经理解了设计意图insertInto,所以我认为这个解决方案值得记录.)
第二,unionAll方法(还有一种union方法,但是使得类型正确变得更加棘手):
这涉及创建一个初始RDD - 再次让我们称之为allTeenagers.
// create initial SchemaRDD even if it's empty, so the types work out right
var allTeenagers = sqc.sql("SELECT ...")
Run Code Online (Sandbox Code Playgroud)
然后,每次:
val teenagers = sqc.sql("SELECT ...")
allTeenagers = allTeenagers.unionAll(teenagers)
Run Code Online (Sandbox Code Playgroud)
也许不用说,你需要列匹配.
| 归档时间: |
|
| 查看次数: |
11629 次 |
| 最近记录: |