如何将Spark Streaming数据转换为Spark DataFrame

Che*_* Wu 8 python spark-streaming pyspark

到目前为止,Spark还没有为流数据创建DataFrame,但是当我进行异常检测时,使用DataFrame进行数据分析会更方便,更快捷.我已经完成了这一部分,但是当我尝试使用流数据进行实时异常检测时,出现了问题.我尝试了几种方法仍然无法将DStream转换为DataFrame,也无法将DStream中的RDD转换为DataFrame.

这是我最新版代码的一部分:

import sys
import re

from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator


sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)

model_inputs = sys.argv[1]

def streamrdd_to_df(srdd):
    sdf = sqlContext.createDataFrame(srdd)
    sdf.show(n=2, truncate=False)
    return sdf

def main():
    indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
    inrdd = indata.map(lambda r: get_tuple(r))
    Features = Row('rawFeatures')
    features_rdd = inrdd.map(lambda r: Features(r))
    features_rdd.pprint(num=3)
    streaming_df = features_rdd.flatMap(streamrdd_to_df)

    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

正如您在main()函数中看到的,当我使用ssc.socketTextStream()方法读取输入流数据时,它会生成DStream,然后我尝试将DStream中的每个个体转换为Row,希望我可以将数据转换为DataFrame以后.

如果我使用ppprint()在这里打印出features_rdd,它可以工作,这让我想到,features_rdd中的每个人都是一批RDD,而整个features_rdd是一个DStream.

然后我创建了streamrdd_to_df()方法并希望将每批RDD转换为数据帧,它给出了我的错误,显示:

错误StreamingContext:启动上下文时出错,将其标记为已停止java.lang.IllegalArgumentException:要求失败:未注册任何输出操作,因此无需执行任何操作

有没有想过如何在Spark流数据上进行DataFrame操作?

use*_*581 5

Spark为我们提供了结构化流可以解决此类问题。它可以生成流式DataFrame,即连续附加的DataFrame。请检查以下链接

http://spark.apache.org/docs/latest/structed-streaming-programming-guide.html


Che*_* Wu 0

一年后,我开始探索Spark 2.0流式方法,终于解决了我的异常检测问题。这是我在 IPython 中的代码,您还可以找到我的原始数据输入是什么样子