使用Pyspark访问Dataframe在Row(嵌套JSON)中的行

san*_*ask 5 json row dataframe pyspark

使用pyspark,我正在从contentdata2文件夹中读取包含一个JSON对象的多个文件,

df = spark.read\
.option("mode", "DROPMALFORMED")\
.json("./data/contentdata2/")

df.printSchema()
content = df.select('fields').collect()
Run Code Online (Sandbox Code Playgroud)

df.printSchema()产生的地方

root
|-- fields: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- field: string (nullable = true)
|    |    |-- type: string (nullable = true)
|    |    |-- value: string (nullable = true)
|-- id: string (nullable = true)
|-- score: double (nullable = true)
|-- siteId: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我希望访问fields.element.field,并存储等于body的每个字段,以及等于urlhash的字段(对于每个JSON对象).

内容的格式是一行(字段),包含其他行,如下所示:

[Row(fields=[Row(field=‘body’, type=None, value=’[“First line of text“,”Second line of text”]), Row(field='urlhash', type=None, value='0a0b774c21c68325aa02cae517821e78687b2780')]),  Row(fields=[Row(field=‘body’, type=None, value=’[“First line of text“,”Second line of text”]), Row(field='urlhash', type=None, value='0a0b774c21c6caca977e7821e78687b2780')]), ...
Run Code Online (Sandbox Code Playgroud)

重新出现的原因"[Row(field s = [Row(field = ....))是因为来自不同文件的JSON对象在一个列表中合并在一起.还有很多其他Row元素我不感兴趣,因此没有包含在示例中.

JSON对象的结构如下所示:

{
  "fields": [
    {
      "field": "body",
      "value": [
        "Some text",
        "Another line of text",
        "Third line of text."
      ]
    },
    {
      "field": "urlhash",
      "value": "0a0a341e189cf2c002cb83b2dc529fbc454f97cc"
    }
  ],
  "score": 0.87475455,
  "siteId": "9222270286501375973",
  "id": "0a0a341e189cf2c002cb83b2dc529fbc454f97cc"
}
Run Code Online (Sandbox Code Playgroud)

我希望存储每个url正文中的所有单词,以便稍后删除停用词并将其输入到K最近邻居算法中.

我如何解决为每个网址存储来自正文的单词的问题,最好是作为带有列urlhash和单词(来自正文的单词列表)的tsv或csv?

MaF*_*aFF 6

您可以通过两种方式解决此问题:

  • 您可以explodearray获得一个记录每行,然后压平嵌套数据帧
  • 或直接访问子字段(对于Spark> 2.X)

让我们从您的示例数据框开始:

from pyspark.sql import Row
from pyspark.sql.types import *
schema = StructType([
    StructField('fields', ArrayType(StructType([
        StructField('field', StringType()), 
        StructField('type', StringType()), 
        StructField('value', StringType())])))])

content = spark.createDataFrame(
    sc.parallelize([
        Row(
            fields=[
                Row(
                    field='body', 
                    type=None, 
                    value='["First line of text","Second line of text"]'), 
                Row(
                    field='urlhash', 
                    type=None, 
                    value='0a0b774c21c68325aa02cae517821e78687b2780')]), 
        Row(
            fields=[
                Row(
                    field='body', 
                    type=None, 
                    value='["First line of text","Second line of text"]'), 
                Row(
                    field='urlhash', 
                    type=None, 
                    value='0a0b774c21c6caca977e7821e78687b2780')])]), schema=schema)
content.printSchema()

    root
     |-- fields: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- field: string (nullable = true)
     |    |    |-- type: string (nullable = true)
     |    |    |-- value: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

1.爆炸和扁平化

嵌套数据框的字段可以使用.,*允许您展平所有嵌套字段并将它们带到root关卡.

import pyspark.sql.functions as psf
content \
    .select(psf.explode('fields').alias('tmp')) \
    .select('tmp.*') \
    .show()

    +-------+----+--------------------+
    |  field|type|               value|
    +-------+----+--------------------+
    |   body|null|["First line of t...|
    |urlhash|null|0a0b774c21c68325a...|
    |   body|null|["First line of t...|
    |urlhash|null|0a0b774c21c6caca9...|
    +-------+----+--------------------+

    root
     |-- field: string (nullable = true)
     |-- type: string (nullable = true)
     |-- value: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

2.直接访问子字段

在Spark的更高版本中,您可以访问嵌套StructType的字段,即使它们包含在嵌套中也是如此ArrayType.你将得到一个ArrayType子场的值.

content \
    .select('fields.field') \
    .show()

    +---------------+
    |          field|
    +---------------+
    |[body, urlhash]|
    |[body, urlhash]|
    +---------------+

    root
     |-- field: array (nullable = true)
     |    |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)