如何根据 pyspark 数据帧中多列的笛卡尔积创建新列

spe*_*rum 5 cartesian dataframe pyspark

让我举一个简单的例子来解释我想要做什么。假设我们有两个非常简单的数据框,如下所示:

\n\n
Df1\n+---+---+---+\n| a1| a2| a3|\n+---+---+---+\n|  2|  3|  7|\n|  1|  9|  6|\n+---+---+---+\n\nDf2\n+---+---+\n| b1| b2|\n+---+---+\n| 10|  2|\n|  9|  3|\n+---+---+\n
Run Code Online (Sandbox Code Playgroud)\n\n

从 df1、df2,我们需要创建一个新的 df,其中的列是 df1、df2 中原始列的笛卡尔积。特别是,新的 df 将具有 \xe2\x80\x98a1b1\xe2\x80\x99,\xe2\x80\x99a1b2\xe2\x80\x99,\xe2\x80\x99a2b1\xe2\x80\x99,\xe2\x80 \x99a2b2\xe2\x80\x99,\xe2\x80\x99a3b1\xe2\x80\x99,\xe2\x80\x99a3b2\xe2\x80\x99,行将是 df1、df2 中相应列的乘法。结果 df 应如下所示:

\n\n
Df3\n+----+----+----+----+----+----+\n|a1b1|a1b2|a2b1|a2b2|a3b1|a3b2|\n+----+----+----+----+----+----+\n|  20|   4|  30|   6|  70|  14|\n|   9|   3|  81|  27|  54|  18|\n+----+----+----+----+----+----+\n
Run Code Online (Sandbox Code Playgroud)\n\n

我搜索了 Spark 在线文档以及此处发布的问题,但似乎它们都是关于行的笛卡尔积,而不是列。例如,rdd.cartesian()提供row中不同值组合的笛卡尔积,如下代码:

\n\n
r = sc.parallelize([1, 2])\nr.cartesian(r).toDF().show()\n\n+---+---+\n| _1| _2|\n+---+---+\n|  1|  1|\n|  1|  2|\n|  2|  1|\n|  2|  2|\n+---+---+\n
Run Code Online (Sandbox Code Playgroud)\n\n

但这不是我需要的。同样,我需要创建新的列而不是行。在我的问题中,行数将保持不变。我明白udf最终可以解决问题。然而,在我的实际应用程序中,我们有巨大的数据集,创建所有列需要很长时间(大约 500 个新列作为所有可能的列组合)。我们更喜欢进行某种向量运算,这可以提高效率。我可能是错的,但 Spark udf 似乎是基于行操作,这可能是为什么花了这么长时间才能完成的原因。

\n\n

非常感谢您的任何建议/反馈/评论。

\n\n

为了您的方便,我在此处附加了简单的代码来创建上面所示的示例数据框:

\n\n
df1 = sqlContext.createDataFrame([[2,3,7],[1,9,6]],['a1','a2','a3'])\ndf1.show()\n\ndf2 = sqlContext.createDataFrame([[10,2],[9,3]],['b1','b2'])\ndf2.show()\n
Run Code Online (Sandbox Code Playgroud)\n

Gau*_*ama 0

据我所知,这并不简单。这是使用 eval 的一个镜头:

# function to add rownumbers in a dataframe
def addrownum(df):
    dff = df.rdd.zipWithIndex().toDF(['features','rownum'])
    odf = dff.map(lambda x : tuple(x.features)+tuple([x.rownum])).toDF(df.columns+['rownum'])
    return odf

df1_ = addrownum(df1)
df2_ = addrownum(df2)
# Join based on rownumbers
outputdf = df1_.rownum.join(df2_,df1_.rownum==df2_.rownum).drop(df1_.rownum).drop(df2_.rownum)

n1 = ['a1','a2','a3']  # columns in set1
n2 = ['b1','b2']       # columns in set2

# I create a string of expression that I want to execute
eval_list = ['x.'+l1+'*'+'x.'+l2 for l1 in n1 for l2 in n2]
eval_str = '('+','.join(eval_list)+')'
col_list = [l1+l2 for l1 in n1 for l2 in n2] 

dfcartesian = outputdf.map(lambda x:eval(eval_str)).toDF(col_list)
Run Code Online (Sandbox Code Playgroud)

Spark.ml.feature 中的 Elementwise Product 可能对您有帮助,但它也同样复杂。您可以将一个列表中的多个元素逐个提取到另一个列表,并将特征向量展开回数据帧。