使用 Pyspark 分解包含 Json 列表的列

And*_*aki 2 python etl apache-spark pyspark

我试图分解一个名为 phone 的列,如下所示的架构和内容:

(customer_external_id,StringType
phones,StringType)

customer_id    phones
x8x46x5        [{"phone" : "(xx) 35xx4x80"},{"phone" : "(xx) xxxx46605"}]
xx44xx5        [{"phone" : "(xx) xxx3x8443"}]
4xxxxx5        [{"phone" : "(xx) x6xx083x3"},{"areaCode" : "xx"},{"phone" : "(xx) 3xxx83x3"}]
xx6564x        [{"phone" : "(x3) x88xx344x"}]
xx8x4x0        [{"phone" : "(xx) x83x5x8xx"}]
xx0434x        [{"phone" : "(0x) 3x6x4080"},{"areaCode" : "xx"}]
x465x40        [{"phone" : "(6x) x6x445xx"}]
x0684x8        [{"phone" : "(xx) x8x88x4x4"},{"phone" : "(xx) x8x88x4x4"}]
x84x850        [{"phone" : "(xx) 55x56xx4"}]
x0604xx        [{"phone" : "(xx) x8x4xxx68"}]
4x6xxx0        [{"phone" : "(xx) x588x43xx"},{"phone" : "(xx) 5x6465xx"},{"phone" : "(xx) x835xxxx8"},{"phone" : "(xx) x5x6465xx"}]
x6x000x        [{"phone" : "(xx) xxx044xx4"}]
5x65533        [{"phone" : "(xx) xx686x0xx"}]
x3668xx        [{"phone" : "(5x) 33x8x3x4"},{"phone" : "(5x) 8040x8x6"}]
Run Code Online (Sandbox Code Playgroud)

所以我试图运行这段代码并得到了随后的错误:

df.select('customer_external_id', explode(df.phones))

AnalysisException: u"cannot resolve 'explode(`phones`)' due to data type mismatch: input to function explode should be array or map type, not StringType;;
'Project [customer_external_id#293, explode(phones#296) AS List()]\n+- Relation[order_id#292,customer_external_id#293,name#294,email#295,phones#296,phones_version#297,classification#298,locale#299] parquet\n"
Run Code Online (Sandbox Code Playgroud)

通过这个错误,我发现我的列是一个 StringType 所以我运行这个代码来删除括号并转换为 json:

phones = df.select('customer_external_id', 'phones').rdd\
    .map(lambda x: str(x).replace('[','')\
                       .replace(']','')\
                       .replace('},{', ','))\
    .map(lambda x: json.loads(x).get('phone')\
    .map(lambda x: Row(x))\
    .toDF(df.select('customer_external_id','phones').schema)
phones.show()

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 4 times, most recent failure: Lost task 0.3 in stage 38.0 (TID 2740, 10.112.80.248, executor 8): org.apache.spark.api.python.PythonException: Traceback (most recent call last)
Run Code Online (Sandbox Code Playgroud)

显然我不能投射到 Json 并且我不能爆炸列。那么我该如何正确处理此类数据以获得此输出:

    +-----------+--------+--------------+
    |customer_id|areaCode|phone         |
    +-----------+--------+--------------+
    |x8x46x5    |null    |(xx) 35xx4x80 |
    |x8x46x5    |null    |(xx) xxxx46605|
    |xx44xx5    |null    |(xx) xxx3x8443|
    |4xxxxx5    |null    |(xx) x6xx083x3|
    |4xxxxx5    |xx      |null          |
    |4xxxxx5    |null    |(xx) 3xxx83x3 |
    |xx6564x    |null    |(x3) x88xx344x|
    |xx8x4x0    |null    |(xx) x83x5x8xx|
    |xx0434x    |null    |(0x) 3x6x4080 |
    |xx0434x    |xx      |null          |
    |x465x40    |null    |(6x) x6x445xx |
    |x0684x8    |null    |(xx) x8x88x4x4|
    |x0684x8    |null    |(xx) x8x88x4x4|
    |x84x850    |null    |(xx) 55x56xx4 |
    |x0604xx    |null    |(xx) x8x4xxx68|
    |4x6xxx0    |null    |(xx) x588x43xx|
    |4x6xxx0    |null    |(xx) 5x6465xx |
    |4x6xxx0    |null    |(xx) x835xxxx8|
    |4x6xxx0    |null    |(xx) x5x6465xx|
    |x6x000x    |null    |(xx) xxx044xx4|
    |5x65533    |null    |(xx) xx686x0xx|
    |x3668xx    |null    |(5x) 33x8x3x4 |
    |x3668xx    |null    |(5x) 8040x8x6 |
    +-----------+--------+--------------+
Run Code Online (Sandbox Code Playgroud)

Sil*_*vio 7

你想要做的是使用from_json方法将字符串转换为数组然后爆炸:

from pyspark.sql.functions import *
from pyspark.sql.types import *

phone_schema = ArrayType(StructType([StructField("phone", StringType())]))

converted = inputDF\
  .withColumn("areaCode", get_json_object("phones", "$[*].areaCode"))\
  .withColumn("phones", explode(from_json("phones", phone_schema)))\
  .withColumn("phone", col("phones.phone"))\
  .drop("phones")\
  .filter(~isnull("phone"))

converted.show()
Run Code Online (Sandbox Code Playgroud)