PySpark根据名称将列表分解为多个列

rob*_*ur1 2 python apache-spark apache-spark-sql pyspark

嗨,我正在处理一个有点困难的文件格式,我正在尝试对其进行清理以备将来处理。我一直在使用Pyspark将数据处理成数据框。

该文件类似于以下内容:

AA 1234  ZXYW
BB A 890
CC B 321
AA 1234  LMNO
BB D 123
CC E 321
AA 1234  ZXYW
CC E 456
Run Code Online (Sandbox Code Playgroud)

每个“ AA”记录都定义一个或多个逻辑组的开始,并且每一行上的数据都是固定长度的,并且其中包含我要提取的编码信息。至少有20-30种不同的记录类型。每一行的开头始终用两个字母代码标识它们。每个组中可以有1个或许多不同的记录类型(即,并非每个组都存在所有记录类型)

作为第一步,我设法按以下格式将记录分组在一起:

+----------------+---------------------------------+
|           index|                           result|
+----------------+---------------------------------+
|               1|[AA 1234  ZXYV,BB A 890,CC B 321]|
|               2|[AA 1234  LMNO,BB D 123,CC E 321]|
|               3|[AA 1234  ZXYV,CC B 321]         |
+----------------+---------------------------------+
Run Code Online (Sandbox Code Playgroud)

第二步,我真的想将数据放入数据框的以下列中:

+----------------+---------------------------------+-------------+--------+--------+
|           index|                           result|           AA|      BB|      CC|
+----------------+---------------------------------+-------------+--------+--------+
|               1|[AA 1234  ZXYV,BB A 890,CC B 321]|AA 1234  ZXYV|BB A 890|CC B 321|
|               2|[AA 1234  LMNO,BB D 123,CC E 321]|AA 1234  LMNO|BB D 123|CC E 321|
|               3|[AA 1234  ZXYV,CC B 321]         |AA 1234  ZXYV|    Null|CC B 321|
+----------------+---------------------------------+-------------+--------+--------+
Run Code Online (Sandbox Code Playgroud)

因为那时提取我需要的信息应该很简单。

有人对我如何做到这一点有任何建议吗?

非常感谢。

Sur*_*esh 5

在不转换为rdd的情况下爆炸数组的另一种方法,

from pyspark.sql import functions as F

udf1 = F.udf(lambda x : x.split()[0])
df.select('index',F.explode('result').alias('id'),udf1(F.col('id')).alias('idtype')).show()

+-----+-------------+------+
|index|           id|idtype|
+-----+-------------+------+
|    1|AA 1234  ZXYV|    AA|
|    1|     BB A 890|    BB|
|    1|     CC B 321|    CC|
|    2|AA 1234  LMNO|    AA|
|    2|     BB D 123|    BB|
|    2|     CC E 321|    CC|
|    3|AA 1234  ZXYV|    AA|
|    3|     CC B 321|    CC|
+-----+-------------+------+ 

df1.groupby('index').pivot('idtype').agg(F.first('id')).join(df,'index').show()
Run Code Online (Sandbox Code Playgroud)


Ale*_*lex 2

您可以使用flatMappivot来实现这一点。从第一阶段的结果开始:

rdd = sc.parallelize([(1,['AA 1234  ZXYV','BB A 890','CC B 321']),
                      (2,['AA 1234  LMNO','BB D 123','CC E 321']),
                      (3,['AA 1234  ZXYV','CC B 321'])])

df = rdd.toDF(['index', 'result'])
Run Code Online (Sandbox Code Playgroud)

您可以首先使用将数组分解为多行flatMap并将两个字母标识符提取到单独的列中。

df_flattened = df.rdd.flatMap(lambda x: [(x[0],y, y[0:2],y[3::]) for y in x[1]])\
               .toDF(['index','result', 'identifier','identifiertype'])
Run Code Online (Sandbox Code Playgroud)

并使用pivot将两个字母标识符更改为列名:

df_result = df_flattened.groupby(df_flattened.index,)\
                        .pivot("identifier")\
                        .agg(first("identifiertype"))\
                        .join(df,'index')
Run Code Online (Sandbox Code Playgroud)

我添加了连接以result恢复列