san*_*ask 5 json row dataframe pyspark
使用pyspark,我正在从contentdata2文件夹中读取包含一个JSON对象的多个文件,
df = spark.read\
.option("mode", "DROPMALFORMED")\
.json("./data/contentdata2/")
df.printSchema()
content = df.select('fields').collect()
Run Code Online (Sandbox Code Playgroud)
df.printSchema()产生的地方
root
|-- fields: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- field: string (nullable = true)
| | |-- type: string (nullable = true)
| | |-- value: string (nullable = true)
|-- id: string (nullable = true)
|-- score: double (nullable = true)
|-- siteId: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我希望访问fields.element.field,并存储等于body的每个字段,以及等于urlhash的字段(对于每个JSON对象).
内容的格式是一行(字段),包含其他行,如下所示:
[Row(fields=[Row(field=‘body’, type=None, value=’[“First line of text“,”Second line of text”]), Row(field='urlhash', type=None, value='0a0b774c21c68325aa02cae517821e78687b2780')]), Row(fields=[Row(field=‘body’, type=None, value=’[“First line of text“,”Second line of text”]), Row(field='urlhash', type=None, value='0a0b774c21c6caca977e7821e78687b2780')]), ...
Run Code Online (Sandbox Code Playgroud)
重新出现的原因"[Row(field s = [Row(field = ....))是因为来自不同文件的JSON对象在一个列表中合并在一起.还有很多其他Row元素我不感兴趣,因此没有包含在示例中.
JSON对象的结构如下所示:
{
"fields": [
{
"field": "body",
"value": [
"Some text",
"Another line of text",
"Third line of text."
]
},
{
"field": "urlhash",
"value": "0a0a341e189cf2c002cb83b2dc529fbc454f97cc"
}
],
"score": 0.87475455,
"siteId": "9222270286501375973",
"id": "0a0a341e189cf2c002cb83b2dc529fbc454f97cc"
}
Run Code Online (Sandbox Code Playgroud)
我希望存储每个url正文中的所有单词,以便稍后删除停用词并将其输入到K最近邻居算法中.
我如何解决为每个网址存储来自正文的单词的问题,最好是作为带有列urlhash和单词(来自正文的单词列表)的tsv或csv?
您可以通过两种方式解决此问题:
explode在array获得一个记录每行,然后压平嵌套数据帧让我们从您的示例数据框开始:
from pyspark.sql import Row
from pyspark.sql.types import *
schema = StructType([
StructField('fields', ArrayType(StructType([
StructField('field', StringType()),
StructField('type', StringType()),
StructField('value', StringType())])))])
content = spark.createDataFrame(
sc.parallelize([
Row(
fields=[
Row(
field='body',
type=None,
value='["First line of text","Second line of text"]'),
Row(
field='urlhash',
type=None,
value='0a0b774c21c68325aa02cae517821e78687b2780')]),
Row(
fields=[
Row(
field='body',
type=None,
value='["First line of text","Second line of text"]'),
Row(
field='urlhash',
type=None,
value='0a0b774c21c6caca977e7821e78687b2780')])]), schema=schema)
content.printSchema()
root
|-- fields: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- field: string (nullable = true)
| | |-- type: string (nullable = true)
| | |-- value: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
1.爆炸和扁平化
嵌套数据框的字段可以使用.,*允许您展平所有嵌套字段并将它们带到root关卡.
import pyspark.sql.functions as psf
content \
.select(psf.explode('fields').alias('tmp')) \
.select('tmp.*') \
.show()
+-------+----+--------------------+
| field|type| value|
+-------+----+--------------------+
| body|null|["First line of t...|
|urlhash|null|0a0b774c21c68325a...|
| body|null|["First line of t...|
|urlhash|null|0a0b774c21c6caca9...|
+-------+----+--------------------+
root
|-- field: string (nullable = true)
|-- type: string (nullable = true)
|-- value: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
2.直接访问子字段
在Spark的更高版本中,您可以访问嵌套StructType的字段,即使它们包含在嵌套中也是如此ArrayType.你将得到一个ArrayType子场的值.
content \
.select('fields.field') \
.show()
+---------------+
| field|
+---------------+
|[body, urlhash]|
|[body, urlhash]|
+---------------+
root
|-- field: array (nullable = true)
| |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5607 次 |
| 最近记录: |