Bg1*_*850 5 apache-spark spark-streaming pyspark
我正在编写一个spark应用程序,我需要根据历史数据来评估流数据,这些数据位于sql server数据库中
现在的想法是,spark将从数据库中获取历史数据并将其保留在内存中,并将根据它评估流数据.
现在我正在获取流数据
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext,functions as func,Row
sc = SparkContext("local[2]", "realtimeApp")
ssc = StreamingContext(sc,10)
files = ssc.textFileStream("hdfs://RealTimeInputFolder/")
########Lets get the data from the db which is relavant for streaming ###
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
dataurl = "jdbc:sqlserver://myserver:1433"
db = "mydb"
table = "stream_helper"
credential = "my_credentials"
########basic data for evaluation purpose ########
files_count = files.flatMap(lambda file: file.split( ))
pattern = '(TranAmount=Decimal.{2})(.[0-9]*.[0-9]*)(\\S+ )(TranDescription=u.)([a-zA-z\\s]+)([\\S\\s]+ )(dSc=u.)([A-Z]{2}.[0-9]+)'
tranfiles = "wasb://myserver.blob.core.windows.net/RealTimeInputFolder01/"
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']
def pre_parse(logline):
"""
to read files as rows of sql in pyspark streaming using the pattern . for use of logging
added 0,1 in case there is any failure in processing by this pattern
"""
match = re.search(pattern,logline)
if match is None:
return(line,0)
else:
return(
Row(
customer_id = match.group(8)
trantype = match.group(5)
amount = float(match.group(2))
),1)
def parse():
"""
actual processing is happening here
"""
parsed_tran = ssc.textFileStream(tranfiles).map(preparse)
success = parsed_tran.filter(lambda s: s[1] == 1).map(lambda x:x[0])
fail = parsed_tran.filter(lambda s:s[1] == 0).map(lambda x:x[0])
if fail.count() > 0:
print "no of non parsed file : %d", % fail.count()
return success,fail
success ,fail = parse()
Run Code Online (Sandbox Code Playgroud)
现在我想通过我从历史数据中获得的数据框来评估它
base_data = sqlContext.read.format("jdbc").options(driver=driver,url=dataurl,database=db,user=credential,password=credential,dbtable=table).load()
Run Code Online (Sandbox Code Playgroud)
现在,因为这是作为数据框返回的,我如何将其用于我的目的.流媒体节目指南在这里说:
"你必须创建一个使用该的StreamingContext使用SparkContext一个SQLContext."
现在,这使我对如何将现有数据帧与流对象一起使用更加困惑.任何帮助都非常感谢.
要操作 DataFrame,您始终需要一个SQLContext,以便您可以像下面这样实例化它:
sc = SparkContext("local[2]", "realtimeApp")
sqlc = SQLContext(sc)
ssc = StreamingContext(sc, 10)
Run Code Online (Sandbox Code Playgroud)
这两个上下文(SQLContext和StreamingContext )将共存于同一个作业中,因为它们与同一个SparkContext关联。但是,请记住,您不能在同一个作业中实例化两个不同的 SparkContext。
从 DStream 创建 DataFrame 后,您可以将历史 DataFrame 与从流创建的 DataFrame 结合起来。为此,我会做类似的事情:
yourDStream.foreachRDD(lambda rdd: sqlContext
.createDataFrame(rdd)
.join(historicalDF, ...)
...
)
Run Code Online (Sandbox Code Playgroud)
考虑一下当您操作流时需要用于连接的流数据量,您可能对窗口函数感兴趣
| 归档时间: |
|
| 查看次数: |
606 次 |
| 最近记录: |