如何在spark中具有不同列数的两个DataFrame上执行并集?

All*_*iph 40 apache-spark apache-spark-sql

我有2 DataFrame秒如下:

来源数据

我需要像这样的工会:

在此输入图像描述

unionAll功能不起作用,因为列的数量和名称不同.

我怎样才能做到这一点?

Alb*_*nto 40

在Scala中,您只需附加所有缺少的列nulls.

import org.apache.spark.sql.functions._

// let df1 and df2 the Dataframes to merge
val df1 = sc.parallelize(List(
  (50, 2),
  (34, 4)
)).toDF("age", "children")

val df2 = sc.parallelize(List(
  (26, true, 60000.00),
  (32, false, 35000.00)
)).toDF("age", "education", "income")

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
  allCols.toList.map(x => x match {
    case x if myCols.contains(x) => col(x)
    case _ => lit(null).as(x)
  })
}

df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 50|       2|     null|   null|
| 34|       4|     null|   null|
| 26|    null|     true|60000.0|
| 32|    null|    false|35000.0|
+---+--------+---------+-------+
Run Code Online (Sandbox Code Playgroud)

更新

两个时间DataFrames都具有相同的列顺序,因为我们total在两种情况下都是映射.

df1.select(expr(cols1, total):_*).show()
df2.select(expr(cols2, total):_*).show()

+---+--------+---------+------+
|age|children|education|income|
+---+--------+---------+------+
| 50|       2|     null|  null|
| 34|       4|     null|  null|
+---+--------+---------+------+

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 26|    null|     true|60000.0|
| 32|    null|    false|35000.0|
+---+--------+---------+-------+
Run Code Online (Sandbox Code Playgroud)

  • `unionAll()` 自 2.0.0 起已被弃用,取而代之的是 `union()` (4认同)
  • 列顺序很重要.请参阅https://issues.apache.org/jira/browse/SPARK-20660 (3认同)
  • 您应该使用 unionByName 来匹配列名 (2认同)

小智 20

这是使用 pyspark 的 Python 3.0 代码:

from pyspark.sql.functions import lit


def __order_df_and_add_missing_cols(df, columns_order_list, df_missing_fields):
    """ return ordered dataFrame by the columns order list with null in missing columns """
    if not df_missing_fields:  # no missing fields for the df
        return df.select(columns_order_list)
    else:
        columns = []
        for colName in columns_order_list:
            if colName not in df_missing_fields:
                columns.append(colName)
            else:
                columns.append(lit(None).alias(colName))
        return df.select(columns)


def __add_missing_columns(df, missing_column_names):
    """ Add missing columns as null in the end of the columns list """
    list_missing_columns = []
    for col in missing_column_names:
        list_missing_columns.append(lit(None).alias(col))

    return df.select(df.schema.names + list_missing_columns)


def __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols):
    """ return union of data frames with ordered columns by left_df. """
    left_df_all_cols = __add_missing_columns(left_df, left_list_miss_cols)
    right_df_all_cols = __order_df_and_add_missing_cols(right_df, left_df_all_cols.schema.names,
                                                        right_list_miss_cols)
    return left_df_all_cols.union(right_df_all_cols)


def union_d_fs(left_df, right_df):
    """ Union between two dataFrames, if there is a gap of column fields,
     it will append all missing columns as nulls """
    # Check for None input
    if left_df is None:
        raise ValueError('left_df parameter should not be None')
    if right_df is None:
        raise ValueError('right_df parameter should not be None')
        # For data frames with equal columns and order- regular union
    if left_df.schema.names == right_df.schema.names:
        return left_df.union(right_df)
    else:  # Different columns
        # Save dataFrame columns name list as set
        left_df_col_list = set(left_df.schema.names)
        right_df_col_list = set(right_df.schema.names)
        # Diff columns between left_df and right_df
        right_list_miss_cols = list(left_df_col_list - right_df_col_list)
        left_list_miss_cols = list(right_df_col_list - left_df_col_list)
        return __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols)
Run Code Online (Sandbox Code Playgroud)

  • 啊,我们又来了,关于 Python、Glue、Spark 的线索为零,只需复制粘贴东西并使东西发挥作用。 (2认同)

小智 17

这是我的 Python 版本:

from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row

def customUnion(df1, df2):
    cols1 = df1.columns
    cols2 = df2.columns
    total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
    def expr(mycols, allcols):
        def processCols(colname):
            if colname in mycols:
                return colname
            else:
                return lit(None).alias(colname)
        cols = map(processCols, allcols)
        return list(cols)
    appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
    return appended
Run Code Online (Sandbox Code Playgroud)

以下是示例用法:

data = [
    Row(zip_code=58542, dma='MIN'),
    Row(zip_code=58701, dma='MIN'),
    Row(zip_code=57632, dma='MIN'),
    Row(zip_code=58734, dma='MIN')
]

firstDF = spark.createDataFrame(data)

data = [
    Row(zip_code='534', name='MIN'),
    Row(zip_code='353', name='MIN'),
    Row(zip_code='134', name='MIN'),
    Row(zip_code='245', name='MIN')
]

secondDF = spark.createDataFrame(data)

customUnion(firstDF,secondDF).show()
Run Code Online (Sandbox Code Playgroud)


Zyg*_*ygD 16

火花 3.1+

df = df1.unionByName(df2, allowMissingColumns=True)
Run Code Online (Sandbox Code Playgroud)

检测结果:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data1=[
(1 , '2016-08-29', 1 , 2, 3),
(2 , '2016-08-29', 1 , 2, 3),
(3 , '2016-08-29', 1 , 2, 3)]
df1 = spark.createDataFrame(data1, ['code' , 'date' , 'A' , 'B', 'C'])
data2=[
(5 , '2016-08-29', 1, 2, 3, 4),
(6 , '2016-08-29', 1, 2, 3, 4),
(7 , '2016-08-29', 1, 2, 3, 4)]
df2 = spark.createDataFrame(data2, ['code' , 'date' , 'B', 'C', 'D', 'E'])

df = df1.unionByName(df2, allowMissingColumns=True)
df.show()

#     +----+----------+----+---+---+----+----+
#     |code|      date|   A|  B|  C|   D|   E|
#     +----+----------+----+---+---+----+----+
#     |   1|2016-08-29|   1|  2|  3|null|null|
#     |   2|2016-08-29|   1|  2|  3|null|null|
#     |   3|2016-08-29|   1|  2|  3|null|null|
#     |   5|2016-08-29|null|  1|  2|   3|   4|
#     |   6|2016-08-29|null|  1|  2|   3|   4|
#     |   7|2016-08-29|null|  1|  2|   3|   4|
#     +----+----------+----+---+---+----+----+
Run Code Online (Sandbox Code Playgroud)


Rag*_*ags 10

一种非常简单的方法 - select从数据框和使用的相同顺序的列unionAll

df1.select('code', 'date', 'A', 'B', 'C', lit(None).alias('D'), lit(None).alias('E'))\
   .unionAll(df2.select('code', 'date', lit(None).alias('A'), 'B', 'C', 'D', 'E'))
Run Code Online (Sandbox Code Playgroud)

  • 从2.0.0版本开始,不推荐使用unionAll(),而推荐使用union()。 (4认同)

Mar*_*kus 10

如果您只是使用简单的lit(None)解决方法(这也是我所知道的唯一方法),我会以某种方式发现这里的大多数 python 答案在他们的写作中有点过于笨拙。作为替代,这可能有用:

# df1 and df2 are assumed to be the given dataFrames from the question

# Get the lacking columns for each dataframe and set them to null in the respective dataFrame.
# First do so for df1...
for column in [column for column in df1.columns if column not in df2.columns]:
    df1 = df1.withColumn(column, lit(None))

# ... and then for df2
for column in [column for column in df2.columns if column not in df1.columns]:
    df2 = df2.withColumn(column, lit(None))
Run Code Online (Sandbox Code Playgroud)


之后就做union()你想做的事。
注意:如果您的列顺序不同df1,请df2使用unionByName()!

result = df1.unionByName(df2)
Run Code Online (Sandbox Code Playgroud)


con*_*lee 8

这是一个pyspark解决方案.

它假定如果df1缺少某个字段df2,则将该缺少的字段添加到df2空值.但是,它还假设如果字段存在于两个数据帧中,但字段的类型或可为空性不同,则两个数据帧冲突且无法组合.在那种情况下,我提出了一个TypeError.

from pyspark.sql.functions import lit

def harmonize_schemas_and_combine(df_left, df_right):
    left_types = {f.name: f.dataType for f in df_left.schema}
    right_types = {f.name: f.dataType for f in df_right.schema}
    left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
    right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)

    # First go over left-unique fields
    for l_name, l_type, l_nullable in left_fields.difference(right_fields):
        if l_name in right_types:
            r_type = right_types[l_name]
            if l_type != r_type:
                raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s"  % (l_name, l_nullable, not(l_nullable))
        df_right = df_right.withColumn(l_name, lit(None).cast(l_type))

    # Now go over right-unique fields
    for r_name, r_type, r_nullable in right_fields.difference(left_fields):
        if r_name in left_types:
            l_type = left_types[r_name]
            if r_type != l_type:
                raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
        df_left = df_left.withColumn(r_name, lit(None).cast(r_type))    

    # Make sure columns are in the same order
    df_left = df_left.select(df_right.columns)

    return df_left.union(df_right)
Run Code Online (Sandbox Code Playgroud)


swd*_*dev 7

修改 Alberto Bonsanto 的版本以保留原始列顺序(OP 暗示顺序应与原始表匹配)。此外,该match部件引起了 Intellij 警告。

这是我的版本:

def unionDifferentTables(df1: DataFrame, df2: DataFrame): DataFrame = {

  val cols1 = df1.columns.toSet
  val cols2 = df2.columns.toSet
  val total = cols1 ++ cols2 // union

  val order = df1.columns ++  df2.columns
  val sorted = total.toList.sortWith((a,b)=> order.indexOf(a) < order.indexOf(b))

  def expr(myCols: Set[String], allCols: List[String]) = {
      allCols.map( {
        case x if myCols.contains(x) => col(x)
        case y => lit(null).as(y)
      })
  }

  df1.select(expr(cols1, sorted): _*).unionAll(df2.select(expr(cols2, sorted): _*))
}
Run Code Online (Sandbox Code Playgroud)


eha*_*nom 5

在pyspark中:

df = df1.join(df2, ['each', 'shared', 'col'], how='full')
Run Code Online (Sandbox Code Playgroud)