将PySpark RDD添加为pyspark.sql.dataframe的新列

jak*_*kko 4 python apache-spark pyspark

我有一个pyspark.sql.dataframe,其中每一行都是一篇新闻文章.然后我有一个RDD代表每篇文章中包含的单词.我想将单词的RDD添加为名为"words"的列到我的新文章的数据框中.我试过了

df.withColumn('words', words_rdd )
Run Code Online (Sandbox Code Playgroud)

但我得到了错误

AssertionError: col should be Column
Run Code Online (Sandbox Code Playgroud)

DataFrame看起来像这样

Articles
the cat and dog ran
we went to the park
today it will rain
Run Code Online (Sandbox Code Playgroud)

但我有3k新闻文章.

我应用了一个函数来清理文本,例如删除停用词,我有一个如下所示的RDD:

[[cat, dog, ran],[we, went, park],[today, will, rain]]
Run Code Online (Sandbox Code Playgroud)

我试图让我的Dataframe看起来像这样:

Articles                 Words
the cat and dog ran      [cat, dog, ran]
we went to the park      [we, went, park]
today it will rain       [today, will, rain]
Run Code Online (Sandbox Code Playgroud)

hi-*_*zir 8

免责声明:

Spark DataFrame一般没有严格定义的顺序.使用风险由您自己承担.

添加索引到现有DataFrame:

from pyspark.sql.types import *

df_index = spark.createDataFrame(
    df.rdd.zipWithIndex(),
    StructType([StructField("data", df.schema), StructField("id", LongType())])
)
Run Code Online (Sandbox Code Playgroud)

添加索引RDD并转换为DataFrame:

words_df = spark.createDataFrame(
    words_rdd.zipWithIndex(),
    StructType([
        StructField("words", ArrayType(StringType())),
        StructField("id", LongType())
    ])
)
Run Code Online (Sandbox Code Playgroud)

加入并选择必填字段:

df_index.join(words_df, "id").select("data.*", "words")
Run Code Online (Sandbox Code Playgroud)

警告

有不同的解决方案,可能在特定情况下有效,但不保证性能和/或正确性.这些包括:

  • 使用monotonically_increasing_idjoin关键-在一般情况下不正确.
  • 使用row_number()窗口函数作为连接键 - 不可接受的性能影响,如果没有定义特定的顺序,通常不正确.
  • 当且仅当两个结构具有相同的数据分布时(在这种情况下应该起作用),使用zipon RDDs- 才能工作.

注意:

在这种特殊情况下,你不应该需要RDD.pyspark.ml.feature提供各种各样的Transformers,应该适合你.

from pyspark.ml.feature import *
from pyspark.ml import Pipeline

df = spark.createDataFrame(
     ["the cat and dog ran", "we went to the park", "today it will rain"],
         "string"
).toDF("Articles")

Pipeline(stages=[
    RegexTokenizer(inputCol="Articles", outputCol="Tokens"), 
    StopWordsRemover(inputCol="Tokens", outputCol="Words")
]).fit(df).transform(df).show()
# +-------------------+--------------------+---------------+
# |           Articles|              Tokens|          Words|
# +-------------------+--------------------+---------------+
# |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]|
# |we went to the park|[we, went, to, th...|   [went, park]|
# | today it will rain|[today, it, will,...|  [today, rain]|
# +-------------------+--------------------+---------------+
Run Code Online (Sandbox Code Playgroud)

可以使用stopWords参数提供停用词列表StopWordsRemover,例如:

StopWordsRemover(
    inputCol="Tokens",
    outputCol="Words",
    stopWords=["the", "and", "we", "to", "it"]
)
Run Code Online (Sandbox Code Playgroud)


Gau*_*ama 3

为什么要将rdd加入回数据框,我宁愿直接从“文章”创建一个新列。有多种方法可以做到这一点,这是我的 5 美分:

from pyspark.sql import Row
from pyspark.sql.context import SQLContext
sqlCtx = SQLContext(sc)    # sc is the sparkcontext

x = [Row(Articles='the cat and dog ran'),Row(Articles='we went to the park'),Row(Articles='today it will rain')]
df = sqlCtx.createDataFrame(x)

df2 = df.map(lambda x:tuple([x.Articles,x.Articles.split(' ')])).toDF(['Articles','words'])
df2.show()
Run Code Online (Sandbox Code Playgroud)

您将得到以下输出:

Articles                 words
the cat and dog ran      [the, cat, and, dog, ran]
we went to the park      [we, went, to, the, park]
today it will rain       [today, it, will, rain]
Run Code Online (Sandbox Code Playgroud)

如果您想实现其他目标,请告诉我。