从JavaRDD <Row>创建的Spark DataFrame将所有列数据复制到第一列

Ume*_*cha 5 apache-spark apache-spark-sql

嗨,我有一个DataFrame,我需要转换为JavaRDD并返回到DataFrame我有以下代码

DataFrame sourceFrame = hiveContext.read().format("orc").load("/path/to/orc/file");
//I do order by in above sourceFrame and then I convert it into JavaRDD
JavaRDD<Row> modifiedRDD = sourceFrame.toJavaRDD().map(new Function<Row,Row>({
    public Row call(Row row) throws Exception {
       if(row != null) {
           //updated row by creating new Row
           return RowFactory.create(updateRow);
       }
      return null;
});
//now I convert above JavaRDD<Row> into DataFrame using the following
DataFrame modifiedFrame = sqlContext.createDataFrame(modifiedRDD,schema);
Run Code Online (Sandbox Code Playgroud)

sourceFramemodifiedFrame当我调用sourceFrame.show()输出时,模式是相同的我看到每个列都有相应的值而且没有列是空的但是当我调用时modifiedFrame.show()我看到所有的列值被合并到第一列值中,例如假设源DataFrame有3列,如下所示

_col1    _col2    _col3
 ABC       10      DEF
 GHI       20      JKL
Run Code Online (Sandbox Code Playgroud)

当我打印从JavaRDD转换的modifiedFrame时,它按以下顺序显示

_col1        _col2      _col3
ABC,10,DEF
GHI,20,JKL
Run Code Online (Sandbox Code Playgroud)

如上所示,_col1具有所有值,_col2和_col3为空.我不知道我做错了什么请指导我提前感谢Spark的新手.

İlk*_*kut 9

正如我在评论中提到的那样;

可能因为将列表作为一个参数而发生.

return RowFactory.create(updateRow);
Run Code Online (Sandbox Code Playgroud)

调查Apache Spark文档和源代码时; 在该指定模式示例中,它们分别为所有列分配参数.只是粗略地研究一些源代码RowFactory.java类和GenericRow类不分配那个参数.因此,尝试分别为行列提供参数.

return RowFactory.create(updateRow.get(0),updateRow.get(1),updateRow.get(2)); // List Example
Run Code Online (Sandbox Code Playgroud)

您可以尝试将列表转换为数组,然后作为参数传递.

YourObject[] updatedRowArray= new YourObject[updateRow.size()];
updateRow.toArray(updatedRowArray);
return RowFactory.create(updatedRowArray);
Run Code Online (Sandbox Code Playgroud)

顺便说一句,RowFactory.create()方法正在创建Row对象.在关于Row对象和RowFactory.create()方法的Apache Spark文档中 ;

表示关系运算符的一行输出.允许通过序数进行通用访问,这将导致基元的装箱开销,以及本机原语访问.使用本机基元接口检索null值是无效的,而在尝试检索可能为null的值之前,用户必须检查isNullAt.

要创建新Row,请在Java中使用RowFactory.create()或在Scala中使用Row.apply().

可以通过提供字段值来构造Row对象.例:

import org.apache.spark.sql._

//从值创建一行.

行(value1,value2,value3,...)

//从Seq值创建一行.

Row.fromSeq(Seq(value1,value2,...))

根据文件; 您还可以分别在创建Row对象时应用自己的必需算法来分隔行列.但我认为将列表转换为数组并将参数作为数组传递将对您有用(我无法尝试请发布您的反馈,谢谢).