jig*_*saw 2 dataframe apache-spark rdd
我正在尝试将类型化 rdd 转换为行 rdd,然后从中创建数据框。当我执行代码时它抛出异常。
代码:
JavaRDD<Counter> rdd = sc.parallelize(counters);
JavaRDD<Row> rowRDD = rdd.map((Function<Counter, Row>) RowFactory::create);
//I am using some schema here based on the class Counter
DataFrame df = sqlContext.createDataFrame(rowRDD, getSchema());
marineDF.show(); //throws Exception
Run Code Online (Sandbox Code Playgroud)
从类型化 rdd 到行 rdd 的转换是否保留行工厂中的顺序?如果不是,我如何确定这一点?
班级代码 :
class Counter {
long vid;
byet[] bytes;
List<B> blist;
}
class B {
String id;
long count;
}
Run Code Online (Sandbox Code Playgroud)
架构:
private StructType getSchema() {
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("vid", DataTypes.LongType, false));
fields.add(DataTypes.createStructField("bytes",DataTypes.createArrayType(DataTypes.ByteType), false));
List<StructField> bFields = new ArrayList<>();
bFields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
bFields.add(DataTypes.createStructField("count", DataTypes.LongType, false));
StructType bclasSchema = DataTypes.createStructType(bFields);
fields.add(DataTypes.createStructField("blist", DataTypes.createArrayType(bclasSchema, false), false));
StructType schema = DataTypes.createStructType(fields);
return schema;
}
Run Code Online (Sandbox Code Playgroud)
失败但有异常:
java.lang.ClassCastException: test.spark.SampleTest$A cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:42)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:221)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$LongConverter$.toScalaImpl(CatalystTypeConverters.scala:367)
Run Code Online (Sandbox Code Playgroud)
问题是这里没有转换。当您创建一个它时Row,它可以接受任意的Object. 它按原样放置。所以它不等于一个DataFrame创造:
spark.createDataFrame(rdd, Counter.class);
Run Code Online (Sandbox Code Playgroud)
或Dataset<Counter>创作:
Encoder<Counter> encoder = Encoders.bean(Counter.class);
spark.createDataset(rdd, encoder);
Run Code Online (Sandbox Code Playgroud)
当使用 bean 类时。
所以RowFactory::create这里不适用。如果要传递所有值,则应该已经以可直接与所需类型映射RDD<Row>一起使用的形式表示。这意味着您必须将每个形状显式映射到以下形状:DataFrameCounterRow
Row(vid, bytes, List(Row(id1, count1), ..., Row(idN, countN))
Run Code Online (Sandbox Code Playgroud)
并且您的代码应该相当于:
JavaRDD<Row> rows = counters.map((Function<Counter, Row>) cnt -> {
return RowFactory.create(
cnt.vid, cnt.bytes,
cnt.blist.stream().map(b -> RowFactory.create(b.id, b.count)).toArray()
);
});
Dataset<Row> df = sqlContext.createDataFrame(rows, getSchema());
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5761 次 |
| 最近记录: |