Aza*_*nov 18 scala apache-spark
我是斯卡拉和火花的新手,请记住这一点:)
实际上,我有三个问题
提前谢谢你
zer*_*323 31
通常应该没有必要,最好使用UDF,但在这里你是:
我应该如何定义函数以将其传递到df.rdd.mapPartitions,如果我想创建新的Row,只需要很少的其他列
它应该采取Iterator[Row]和返回Iterator[T]所以在你的情况下你应该使用这样的东西
import org.apache.spark.sql.Row
def transformRows(iter: Iterator[Row]): Iterator[Row] = ???
Run Code Online (Sandbox Code Playgroud)
如何在Row对象中添加几列(或创建一个新列)
有访问的多种方式Row,包括值Row.get*的方法,Row.toSeq等新的Row可使用创建Row.apply,Row.fromSeq,Row.fromTuple或RowFactory.例如:
def transformRow(row: Row): Row = Row.fromSeq(row.toSeq ++ Array[Any](-1, 1))
Run Code Online (Sandbox Code Playgroud)
如何从创建的RDD创建DataFrame
如果您有,RDD[Row]您可以使用SQLContext.createDataFrame并提供架构.
把这一切放在一起:
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
val df = sc.parallelize(Seq(
(1.0, 2.0), (0.0, -1.0),
(3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
val newSchema = StructType(df.schema.fields ++ Array(
StructField("z", IntegerType, false), StructField("v", IntegerType, false)))
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show
// +---+----+---+---+
// | x| y| z| v|
// +---+----+---+---+
// |1.0| 2.0| -1| 1|
// |0.0|-1.0| -1| 1|
// |3.0| 4.0| -1| 1|
// |6.0|-2.3| -1| 1|
// +---+----+---+---+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
15979 次 |
| 最近记录: |