Spark:通过在两个数据帧上添加行索引/数字来合并2个数据帧

MrG*_*rts 12 apache-spark apache-spark-sql pyspark

问:在PySpark中,有没有办法合并两个数据帧或将数据帧的一列复制到另一个?

例如,我有两个Dataframe:

DF1              
C1                    C2                                                        
23397414             20875.7353   
5213970              20497.5582   
41323308             20935.7956   
123276113            18884.0477   
76456078             18389.9269 
Run Code Online (Sandbox Code Playgroud)

借调数据框

DF2
C3                       C4
2008-02-04               262.00                 
2008-02-05               257.25                 
2008-02-06               262.75                 
2008-02-07               237.00                 
2008-02-08               231.00 
Run Code Online (Sandbox Code Playgroud)

然后我想将DF2的C3添加到DF1,如下所示:

New DF              
    C1                    C2          C3                                              
    23397414             20875.7353   2008-02-04
    5213970              20497.5582   2008-02-05
    41323308             20935.7956   2008-02-06
    123276113            18884.0477   2008-02-07
    76456078             18389.9269   2008-02-08
Run Code Online (Sandbox Code Playgroud)

我希望这个例子很清楚.

Ram*_*ram 13

zipWithIndex.map在这种情况下,rownum +窗口函数即解决方案1或解决方案2应该有帮助.

方案1:您可以使用窗口函数来获取这个

然后我建议你添加rownumber作为额外的列名称Dataframe来说df1.

  DF1              
    C1                    C2                 columnindex                                             
    23397414             20875.7353            1
    5213970              20497.5582            2
    41323308             20935.7956            3
    123276113            18884.0477            4
    76456078             18389.9269            5
Run Code Online (Sandbox Code Playgroud)

第二个数据帧

DF2
C3                       C4             columnindex
2008-02-04               262.00            1        
2008-02-05               257.25            2      
2008-02-06               262.75            3      
2008-02-07               237.00            4          
2008-02-08               231.00            5
Run Code Online (Sandbox Code Playgroud)

现在..做df1和df2的内部连接,这就是...你将得到低于输出

这样的事情

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()

df1 = ....  // as showed above df1

df2 = ....  // as shown above df2


df11 =  df1.withColumn("columnindex", rowNumber().over(w))
  df22 =  df2.withColumn("columnindex", rowNumber().over(w))

newDF = df11.join(df22, df11.columnindex == df22.columnindex, 'inner').drop(df22.columnindex)
newDF.show()



New DF              
    C1                    C2          C3                                              
    23397414             20875.7353   2008-02-04
    5213970              20497.5582   2008-02-05
    41323308             20935.7956   2008-02-06
    123276113            18884.0477   2008-02-07
    76456078             18389.9269   2008-02-08
Run Code Online (Sandbox Code Playgroud)

解决方案2:另一个好方法(可能这是最好的:))在scala中,你可以转换为pyspark:

/**
* Add Column Index to dataframe 
*/
def addColumnIndex(df: DataFrame) = sqlContext.createDataFrame(
  // Add Column index
  df.rdd.zipWithIndex.map{case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex)},
  // Create schema
  StructType(df.schema.fields :+ StructField("columnindex", LongType, false))
)

// Add index now...
val df1WithIndex = addColumnIndex(df1)
val df2WithIndex = addColumnIndex(df2)

 // Now time to join ...
val newone = df1WithIndex
  .join(df2WithIndex , Seq("columnindex"))
  .drop("columnindex")
Run Code Online (Sandbox Code Playgroud)


Jed*_*Jed 8

我想我会分享来自@Ram Ghadiyaram的#2上面的答案#2的python(pyspark)翻译:

from pyspark.sql.functions import col
def addColumnIndex(df): 
  # Create new column names
  oldColumns = df.schema.names
  newColumns = oldColumns + ["columnindex"]

  # Add Column index
  df_indexed = df.rdd.zipWithIndex().map(lambda (row, columnindex): \
                                         row + (columnindex,)).toDF()

  #Rename all the columns
  new_df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], 
                  newColumns[idx]), xrange(len(oldColumns)), df_indexed)   
  return new_df

# Add index now...
df1WithIndex = addColumnIndex(df1)
df2WithIndex = addColumnIndex(df2)

#Now time to join ...
newone = df1WithIndex.join(df2WithIndex, col("columnindex"),
                           'inner').drop("columnindex")
Run Code Online (Sandbox Code Playgroud)


Dyn*_* Fu 5

对于python3版本,

from pyspark.sql.types import StructType, StructField, LongType

def with_column_index(sdf): 
    new_schema = StructType(sdf.schema.fields + [StructField("ColumnIndex", LongType(), False),])
    return sdf.rdd.zipWithIndex().map(lambda row: row[0] + (row[1],)).toDF(schema=new_schema)

df1_ci = with_column_index(df1)
df2_ci = with_column_index(df2)
join_on_index = df1_ci.join(df2_ci, df1_ci.ColumnIndex == df2_ci.ColumnIndex, 'inner').drop("ColumnIndex")
Run Code Online (Sandbox Code Playgroud)