如何在Pyspark中加入多个列?

use*_*714 31 python join apache-spark apache-spark-sql pyspark

我正在使用Spark 1.3,并希望使用python接口(SparkSQL)加入多个列

以下作品:

我首先将它们注册为临时表.

numeric.registerTempTable("numeric")
Ref.registerTempTable("Ref")

test  = numeric.join(Ref, numeric.ID == Ref.ID, joinType='inner')
Run Code Online (Sandbox Code Playgroud)

我现在想基于多个列加入它们.

我得到SyntaxError:语法无效:

test  = numeric.join(Ref,
   numeric.ID == Ref.ID AND numeric.TYPE == Ref.TYPE AND
   numeric.STATUS == Ref.STATUS ,  joinType='inner')
Run Code Online (Sandbox Code Playgroud)

zer*_*323 58

您应该使用&/ |运算符并注意运算符优先级(==优先级低于按位ANDOR):

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x3"))

df = df1.join(df2, (df1.x1 == df2.x1) & (df1.x2 == df2.x2))
df.show()

## +---+---+---+---+---+---+
## | x1| x2| x3| x1| x2| x3|
## +---+---+---+---+---+---+
## |  2|  b|3.0|  2|  b|0.0|
## +---+---+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)

  • @Chogg,他的意思是,如果你不小心使用括号,短语`df1.x1 == df2.x1 & df1.x2 == df2.x2`,(括号被删除)将由Python解释器计算如 `df1.x1 == (df2.x1 & df1.x2) == df2.x2`,这可能会引发令人困惑且非描述性的错误。 (5认同)
  • 当您说“注意运算符优先级”时,您的意思是什么?您的意思是我应该将括号放在正确的位置以将正确的表格放在一起吗? (2认同)
  • 为什么它会生成两次“x1”和“x2”列? (2认同)

Flo*_*ian 33

另一种方法是:

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x4"))

df = df1.join(df2, ['x1','x2'])
df.show()
Run Code Online (Sandbox Code Playgroud)

哪个输出:

+---+---+---+---+
| x1| x2| x3| x4|
+---+---+---+---+
|  2|  b|3.0|0.0|
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)

主要优点是表连接的列不会在输出中重复,从而降低遇到错误的风险,例如:org.apache.spark.sql.AnalysisException: Reference 'x1' is ambiguous, could be: x1#50L, x1#57L.


每当两个表中的列具有不同的名称,(让我们在上面的例子说,df2有列y1,y2y4),可以使用下面的语法:

df = df1.join(df2.withColumnRenamed('y1','x1').withColumnRenamed('y2','x2'), ['x1','x2'])
Run Code Online (Sandbox Code Playgroud)

  • 这可能是我最不喜欢的 pyspark 错误:`引用'x1'不明确,可能是:x1#50L,x1#57L。`我不明白为什么它让你做类似`df = df1.join(df2, df1.x1 == df2.x1)`,然后当您尝试对生成的“df”执行几乎任何操作时就会出错。这只是一个小抱怨,但是有什么理由让你想要结果“df”具有重复的名称吗? (3认同)

小智 13

test = numeric.join(Ref, 
   on=[
     numeric.ID == Ref.ID, 
     numeric.TYPE == Ref.TYPE,
     numeric.STATUS == Ref.STATUS 
   ], how='inner')
Run Code Online (Sandbox Code Playgroud)

  • 答案很棒。但对于最佳实践,请提供解释。你只发布代码会让OP和未来的商业者复制并粘贴你的答案,而不理解答案背后的逻辑。请提供答案并进行一些解释。谢谢你! (2认同)