如何将列添加到mapPartitions中的org.apache.spark.sql.Row中

Aza*_*nov 18 scala apache-spark

我是斯卡拉和火花的新手,请记住这一点:)

实际上,我有三个问题

  1. 我应该如何定义函数以将其传递到df.rdd.mapPartitions,如果我想创建新的Row,只需要很少的其他列
  2. 如何在Row对象中添加几列(或创建一个新列)
  3. 如何从创建的RDD创建DataFrame

提前谢谢你

zer*_*323 31

通常应该没有必要,最好使用UDF,但在这里你是:

我应该如何定义函数以将其传递到df.rdd.mapPartitions,如果我想创建新的Row,只需要很少的其他列

它应该采取Iterator[Row]和返回Iterator[T]所以在你的情况下你应该使用这样的东西

import org.apache.spark.sql.Row

def transformRows(iter: Iterator[Row]): Iterator[Row] = ???
Run Code Online (Sandbox Code Playgroud)

如何在Row对象中添加几列(或创建一个新列)

有访问的多种方式Row,包括值Row.get*的方法,Row.toSeq等新的Row可使用创建Row.apply,Row.fromSeq,Row.fromTupleRowFactory.例如:

def transformRow(row: Row): Row =  Row.fromSeq(row.toSeq ++ Array[Any](-1, 1))
Run Code Online (Sandbox Code Playgroud)

如何从创建的RDD创建DataFrame

如果您有,RDD[Row]您可以使用SQLContext.createDataFrame并提供架构.

把这一切放在一起:

import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

val  df = sc.parallelize(Seq(
    (1.0, 2.0), (0.0, -1.0),
    (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)

val newSchema = StructType(df.schema.fields ++ Array(
  StructField("z", IntegerType, false), StructField("v", IntegerType, false)))

sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show

// +---+----+---+---+
// |  x|   y|  z|  v|
// +---+----+---+---+
// |1.0| 2.0| -1|  1|
// |0.0|-1.0| -1|  1|
// |3.0| 4.0| -1|  1|
// |6.0|-2.3| -1|  1|
// +---+----+---+---+
Run Code Online (Sandbox Code Playgroud)

  • @zero323 您还可以提供一个使用 Pyspark 执行此操作的示例吗?我需要一个示例来向 mapPartitions 中的 Row 添加新列。 (2认同)