在PySpark中爆炸

use*_*118 25 python apache-spark apache-spark-sql pyspark

我想从包含单词列表的DataFrame转换为DataFrame,每个单词都在自己的行中.

如何在DataFrame中的列上进行爆炸?

下面是我的一些尝试示例,您可以在其中取消注释每个代码行并获取以下注释中列出的错误.我在Python 2.7中使用PySpark和Spark 1.6.1.

from pyspark.sql.functions import split, explode
DF = sqlContext.createDataFrame([('cat \n\n elephant rat \n rat cat', )], ['word'])
print 'Dataset:'
DF.show()
print '\n\n Trying to do explode: \n'
DFsplit_explode = (
 DF
 .select(split(DF['word'], ' '))
#  .select(explode(DF['word']))  # AnalysisException: u"cannot resolve 'explode(word)' due to data type mismatch: input to function explode should be array or map type, not StringType;"
#   .map(explode)  # AttributeError: 'PipelinedRDD' object has no attribute 'show'
#   .explode()  # AttributeError: 'DataFrame' object has no attribute 'explode'
).show()

# Trying without split
print '\n\n Only explode: \n'

DFsplit_explode = (
 DF 
 .select(explode(DF['word']))  # AnalysisException: u"cannot resolve 'explode(word)' due to data type mismatch: input to function explode should be array or map type, not StringType;"
).show()
Run Code Online (Sandbox Code Playgroud)

请指教

zer*_*323 33

explode并且split是SQL函数.两者都在SQL上运行Column.split将Java正则表达式作为第二个参数.如果你想在任意空格上分隔数据,你需要这样的东西:

df = sqlContext.createDataFrame(
    [('cat \n\n elephant rat \n rat cat', )], ['word']
)

df.select(explode(split(col("word"), "\s+")).alias("word")).show()

## +--------+
## |    word|
## +--------+
## |     cat|
## |elephant|
## |     rat|
## |     rat|
## |     cat|
## +--------+
Run Code Online (Sandbox Code Playgroud)


Ale*_*der 14

要拆分空格并删除空行,请添加该where子句.

DF = sqlContext.createDataFrame([('cat \n\n elephant rat \n rat cat\nmat\n', )], ['word'])

>>> (DF.select(explode(split(DF.word, "\s")).alias("word"))
       .where('word != ""')
       .show())

+--------+
|    word|
+--------+
|     cat|
|elephant|
|     rat|
|     rat|
|     cat|
|     mat|
+--------+
Run Code Online (Sandbox Code Playgroud)

  • 对于稍微更完整的解决方案,可以推广到必须报告多个列的情况,请使用'withColumn'而不是简单的'select',即:df.withColumn('word',explode('word')).show ()这保证了在使用explode之后,DataFrame中的所有其余列仍然存在于输出DataFrame中.这比指定需要选择的每一列更简单,即:df.select('col1','col2',...,'colN',explode('word')).show() (4认同)