SUN*_*R C 5 python json apache-spark dstream pyspark
我正在尝试从dstream中的json创建一个数据框,但是下面的代码似乎无法正确显示该数据框-
import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']
if __name__ == "__main__":
if len(sys.argv) != 3:
raise IOError("Invalid usage; the correct format is:\nquadrant_count.py <hostname> <port>")
# Initialize a SparkContext with a name
spc = SparkContext(appName="jsonread")
sqlContext = SQLContext(spc)
# Create a StreamingContext with a batch interval of 2 seconds
stc = StreamingContext(spc, 2)
# Checkpointing feature
stc.checkpoint("checkpoint")
# Creating a DStream to connect to hostname:port (like localhost:9999)
lines = stc.socketTextStream(sys.argv[1], int(sys.argv[2]))
lines.pprint()
parsed = lines.map(lambda x: json.loads(x))
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SQLContext
sqlContext = getSqlContextInstance(rdd.context)
# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(word=w))
wordsDataFrame = sqlContext.createDataFrame(rowRdd)
# Register as table
wordsDataFrame.registerTempTable("mytable")
testDataFrame = sqlContext.sql("select summary from mytable")
print(testDataFrame.show())
print(testDataFrame.printSchema())
except:
pass
parsed.foreachRDD(process)
stc.start()
# Wait for the computation to terminate
stc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
没有错误,但是脚本运行时,它确实从流上下文成功读取了json,但是它不以摘要或数据框模式打印值。
我尝试读取的示例json-
{“ reviewerID”:“ A2IBPI20UZIR0U”,“ asin”:“ 1384719342”,“ reviewerName”:“ cassandra tu \”是的,那就像,u ...“,” helpful“:[0,0], “ reviewText”:“在此无需多写,但它确实可以实现预期的效果。过滤掉流行声。现在我的录音更加清晰。它是亚马逊上价格最低的流行过滤器之一,因此不妨购买它,尽管价格合理,它们仍然可以正常工作。“,”总体“:5.0,”摘要“:”好“,” unixReviewTime“:1393545600,” reviewTime”:“ 2014年2月28日”}
我绝对是发动流媒体的新手,通过阅读文档开始从事宠物项目。任何帮助和指导,我们将不胜感激。