我对Spark Streaming非常陌生,我正在尝试使用pyspark从Kafka读取和解析JSON流。读取流是可以的,而且我可以pprint()RDD。
{"Address":"22.79.52.79","AlarmProfile":"-1","Amps":"11.98","AmpsLimit":"90","AssetTag":"-1","AssetTag_1":"-1","Blank":"0","CAN":"0","Chain":"2","Config":"\u003cUnknown\u003e",...,"WattsLimit":"-1"}
Run Code Online (Sandbox Code Playgroud)
我想解析json,以便可以使用例如my_parsed_json [“ Amps”]
但是我不知道如何在它们上使用json.loads()。
我以这种方式运行脚本:
/data/spark/bin/spark-submit --master spark://des01:7077 --total-executor-cores 2 --jars /data/dev/2.10/spark-streaming-kafka-assembly_2.10-1.5.2.jar test.py pkb01:9092 topicname
Run Code Online (Sandbox Code Playgroud)
其中“ pkb01:9092”是Kafka代理,“ topicname”是Kafka主题。
我的python代码是:
from __future__ import print_function
import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
# sc es el Spark Context
sc = SparkContext(appName="mitest")
ssc = StreamingContext(sc, 2)
brokers, topico = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topico], {"metadata.broker.list": brokers})
dstream = kvs.map(lambda x: x[1])
dstream.pprint()
Run Code Online (Sandbox Code Playgroud)
我想包含以下内容:
my_parsed_json = dstream.map(lambda x: json.loads(x))
Run Code Online (Sandbox Code Playgroud)
但我从Spark收到错误。有什么帮助吗?
添加的错误:
Traceback …Run Code Online (Sandbox Code Playgroud)