使用Spark将列转换为行

Rao*_*ouf 26 python transpose pivot apache-spark

我正在尝试将我的表的某些列转换为行.我正在使用Python和Spark 1.5.0.这是我的初始表:

+-----+-----+-----+-------+
|  A  |col_1|col_2|col_...|
+-----+-------------------+
|  1  |  0.0|  0.6|  ...  |
|  2  |  0.6|  0.7|  ...  |
|  3  |  0.5|  0.9|  ...  |
|  ...|  ...|  ...|  ...  |
Run Code Online (Sandbox Code Playgroud)

我想有这样的事情:

+-----+--------+-----------+
|  A  | col_id | col_value |
+-----+--------+-----------+
|  1  |   col_1|        0.0|
|  1  |   col_2|        0.6|   
|  ...|     ...|        ...|    
|  2  |   col_1|        0.6|
|  2  |   col_2|        0.7| 
|  ...|     ...|        ...|  
|  3  |   col_1|        0.5|
|  3  |   col_2|        0.9|
|  ...|     ...|        ...|
Run Code Online (Sandbox Code Playgroud)

有人知道我能做到吗?谢谢您的帮助.

zer*_*323 36

使用基本的Spark SQL函数相对简单.

蟒蛇

from pyspark.sql.functions import array, col, explode, struct, lit

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

to_long(df, ["A"])
Run Code Online (Sandbox Code Playgroud)

斯卡拉:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}

val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")

def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
  val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
  require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")      

  val kvs = explode(array(
    cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
  ))

  val byExprs = by.map(col(_))

  df
    .select(byExprs :+ kvs.alias("_kvs"): _*)
    .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
}

toLong(df, Seq("A"))
Run Code Online (Sandbox Code Playgroud)

  • 我不认为那"相对"简单:) (33认同)
  • 如何做相反的事情。如何从第二个数据帧制作第一个数据帧? (2认同)

Vam*_*ala 7

一种方式解决与pyspark sql使用功能create_mapexplode

from pyspark.sql import functions as func
#Use `create_map` to create the map of columns with constant 
df = df.withColumn('mapCol', \
                    func.create_map(func.lit('col_1'),df.col_1,
                                    func.lit('col_2'),df.col_2,
                                    func.lit('col_3'),df.col_3
                                   ) 
                  )
#Use explode function to explode the map 
res = df.select('*',func.explode(df.mapCol).alias('col_id','col_value'))
res.show()
Run Code Online (Sandbox Code Playgroud)


Gon*_*tti 6

您可以使用堆栈函数:

例如:

df.selectExpr("stack(2, 'col_1', col_1, 'col_2', col_2) as (key, value)")
Run Code Online (Sandbox Code Playgroud)

在哪里:

  • 2 是要堆叠的列数(col_1 和 col_2)
  • 'col_1' 是键的字符串
  • col_1 是从中获取值的列

如果您有多个列,则可以构建整个堆栈字符串迭代列名并将其传递给selectExpr

  • df.selectExpr('column_names_to_keep', 'column_names_to_keep', "stack(2, 'col_1', col_1, 'col_2', col_2) as (key, value)") (2认同)

jav*_*dba 5

目前,Spark本地线性代数库非常薄弱:并且它们不包括上述基本操作。

有一个JIRA可以解决Spark 2.1的问题,但是今天对您没有帮助。

需要考虑的事情:执行转置可能需要完全改组数据。

现在,您将需要直接编写RDD代码。我已经用transposescala 写过-但不是用python写的。这是scala版本:

 def transpose(mat: DMatrix) = {
    val nCols = mat(0).length
    val matT = mat
      .flatten
      .zipWithIndex
      .groupBy {
      _._2 % nCols
    }
      .toSeq.sortBy {
      _._1
    }
      .map(_._2)
      .map(_.map(_._1))
      .toArray
    matT
  }
Run Code Online (Sandbox Code Playgroud)

因此,您可以将其转换为python以供使用。在这一特定时刻,我没有足够的带宽来编写/测试该消息:请告知您您是否无法进行该转换。

至少-以下内容很容易转换为python

  • zipWithIndex-> enumerate()(等效于python-归功于@ zero323)
  • map -> [someOperation(x) for x in ..]
  • groupBy -> itertools.groupBy()

这是flatten没有python等效项的实现:

  def flatten(L):
        for item in L:
            try:
                for i in flatten(item):
                    yield i
            except TypeError:
                yield item
Run Code Online (Sandbox Code Playgroud)

因此,您应该能够将它们组合在一起以寻求解决方案。