Spark 2.0.0用可变模式读取json数据

xv7*_*v70 6 schema json apache-spark pyspark

我正在尝试处理一个月的网站流量,它存储在一个S3存储桶中作为json(每行一个json对象/网站流量点击).数据量足够大,我不能要求Spark推断架构(OOM错误).如果我指定架构,它显然很好.但是,问题是每个json对象中包含的字段不同,所以即使我使用一天的流量构建模式,每月模式也会不同(更多字段),因此我的Spark作业失败.

所以我很想知道别人如何处理这个问题.我可以使用传统的RDD mapreduce作业来提取我感兴趣的字段,导出然后将所有内容加载到数据帧中.但这很慢,看起来有点像弄巧成拙.

我在这里找到了类似的问题,但没有相关信息.

谢谢.

zer*_*323 8

如果您知道您感兴趣的字段,只需提供架构的子集.JSON阅读器可以优雅地忽略意外字段.假设您的数据如下所示:

import json
import tempfile

object = {"foo": {"bar": {"x": 1, "y": 1}, "baz": [1, 2, 3]}}

_, f = tempfile.mkstemp()
with open(f, "w") as fw:
    json.dump(object, fw)
Run Code Online (Sandbox Code Playgroud)

而且你只对foo.bar.xfoo.bar.z(不存在)感兴趣:

from pyspark.sql.types import StructType

schema = StructType.fromJson({'fields': [{'metadata': {},
   'name': 'foo',
   'nullable': True,
   'type': {'fields': [
       {'metadata': {}, 'name': 'bar', 'nullable': True, 'type': {'fields': [
           {'metadata': {}, 'name': 'x', 'nullable': True, 'type': 'long'},
           {'metadata': {}, 'name': 'z', 'nullable': True, 'type': 'double'}],
       'type': 'struct'}}],
    'type': 'struct'}}],
 'type': 'struct'})

df = spark.read.schema(schema).json(f)
df.show()

## +----------+
## |       foo|
## +----------+
## |[[1,null]]|
## +----------+

df.printSchema()
## root
##  |-- foo: struct (nullable = true)
##  |    |-- bar: struct (nullable = true)
##  |    |    |-- x: long (nullable = true)
##  |    |    |-- z: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

您还可以降低模式推断的采样率,以提高整体性能.