到目前为止,Spark还没有为流数据创建DataFrame,但是当我进行异常检测时,使用DataFrame进行数据分析会更方便,更快捷.我已经完成了这一部分,但是当我尝试使用流数据进行实时异常检测时,出现了问题.我尝试了几种方法仍然无法将DStream转换为DataFrame,也无法将DStream中的RDD转换为DataFrame.
这是我最新版代码的一部分:
import sys
import re
from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator
sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)
model_inputs = sys.argv[1]
def streamrdd_to_df(srdd):
sdf = sqlContext.createDataFrame(srdd)
sdf.show(n=2, truncate=False)
return sdf
def main():
indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
inrdd = indata.map(lambda r: get_tuple(r))
Features = Row('rawFeatures')
features_rdd = …Run Code Online (Sandbox Code Playgroud)