小编jca*_*lbo的帖子

用pyspark解析json流

我对Spark Streaming非常陌生,我正在尝试使用pyspark从Kafka读取和解析JSON流。读取流是可以的,而且我可以pprint()RDD。

 {"Address":"22.79.52.79","AlarmProfile":"-1","Amps":"11.98","AmpsLimit":"90","AssetTag":"-1","AssetTag_1":"-1","Blank":"0","CAN":"0","Chain":"2","Config":"\u003cUnknown\u003e",...,"WattsLimit":"-1"}
Run Code Online (Sandbox Code Playgroud)

我想解析json,以便可以使用例如my_parsed_json [“ Amps”]

但是我不知道如何在它们上使用json.loads()。

我以这种方式运行脚本:

/data/spark/bin/spark-submit  --master spark://des01:7077 --total-executor-cores 2 --jars /data/dev/2.10/spark-streaming-kafka-assembly_2.10-1.5.2.jar test.py pkb01:9092 topicname
Run Code Online (Sandbox Code Playgroud)

其中“ pkb01:9092”是Kafka代理,“ topicname”是Kafka主题。

我的python代码是:

from __future__ import print_function

import sys
import json

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# sc es el Spark Context

sc = SparkContext(appName="mitest")
ssc = StreamingContext(sc, 2)

brokers, topico = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topico], {"metadata.broker.list": brokers})

dstream = kvs.map(lambda x: x[1])

dstream.pprint()
Run Code Online (Sandbox Code Playgroud)

我想包含以下内容:

my_parsed_json = dstream.map(lambda x: json.loads(x))
Run Code Online (Sandbox Code Playgroud)

但我从Spark收到错误。有什么帮助吗?

添加的错误:

    Traceback …
Run Code Online (Sandbox Code Playgroud)

python streaming json apache-spark

1
推荐指数
1
解决办法
5092
查看次数

标签 统计

apache-spark ×1

json ×1

python ×1

streaming ×1