如何使用java对象将两个spark数据集连接到一个?

vit*_*iti 5 java apache-spark apache-spark-dataset apache-spark-encoders

我在spark中加入两个数据集有点问题,我有这个:

SparkConf conf = new SparkConf()
    .setAppName("MyFunnyApp")
    .setMaster("local[*]");

SparkSession spark = SparkSession
    .builder()
    .config(conf)
    .config("spark.debug.maxToStringFields", 150)
    .getOrCreate();
//...
//Do stuff
//...
Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class);
Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class);

Dataset<MyOwnObject1> object1DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .option("inferSchema","true")
    .csv(pathToFile1)
    .as(encoderObject1);

Dataset<MyOwnObject2> object2DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .option("inferSchema","true")
    .csv(pathToFile2)
    .as(encoderObject2);
Run Code Online (Sandbox Code Playgroud)

我可以打印架构并正确显示它.

//Here start the problem
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = 
    object1DS.join(object2DS, object1DS.col("column01")
    .equalTo(object2DS.col("column01")))
    .as(Encoders.tuple(MyOwnObject1,MyOwnObject2));
Run Code Online (Sandbox Code Playgroud)

最后一行无法连接并得到我这个错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed as the number of fields does not line up.;
Run Code Online (Sandbox Code Playgroud)

这是真的,因为Tuple2(object2)没有所有变量......

然后我试过这个:

 Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = object1DS
    .joinWith(object2DS, object1DS
        .col("column01")
        .equalTo(object2DS.col("column01")));
Run Code Online (Sandbox Code Playgroud)

并且工作正常!但是,我需要一个没有元组的新数据集,我有一个object3,它有一些来自object1和object2的变量,然后我有这个问题:

Encoder<MyOwnObject3> encoderObject3 = Encoders.bean(MyOwnObject3.class);
Dataset<MyOwnObject3> object3DS = joinObjectDS.map(tupleObject1Object2 -> {
    MyOwnObject1 myOwnObject1 = tupleObject1Object2._1();
    MyOwnObject2 myOwnObject2 = tupleObject1Object2._2();
    MyOwnObject3 myOwnObject3 = new MyOwnObject3(); //Sets all vars with start values
    //...
    //Sets data from object 1 and 2 to 3.
    //...
    return myOwnObject3;
}, encoderObject3);
Run Code Online (Sandbox Code Playgroud)

失败!...这是错误:

17/05/10 12:17:43 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 593, Column 72: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import
Run Code Online (Sandbox Code Playgroud)

超过数千条错误线......

我能做什么?我试过了:

  • 使用String,int(或Integer)和double(或Double)创建我的对象(不再)
  • 使用不同的编码器,如kryo或javaSerialization
  • 使用JavaRDD(工作!但非常慢)并使用带有行的Dataframes(工作,但我需要更改许多对象)
  • 我的所有java对象都是可序列化的
  • 使用sparks 2.1.0和2.1.1,现在我的pom.xml上有2.1.1

我想使用数据集,使用来自Dataframes的速度和来自JavaRDD的对象sintax ......

救命?

谢谢

vit*_*iti 1

最后我找到了解决方案,

当我的代码创建数据集时,我遇到了选项 inferSchema 的问题。我有一个字符串列,选项 inferSchema 返回一个整数列,因为所有值都是“数字”,但我需要将它们用作字符串(如“0001”、“0002”...)我需要做一个模式,但是我有很多变量,然后我用我的所有类写这个:

List<StructField> fieldsObject1 = new ArrayList<>();
for (Field field : MyOwnObject1.class.getDeclaredFields()) {
    fieldsObject1.add(DataTypes.createStructField(
        field.getName(),
        CatalystSqlParser.parseDataType(field.getType().getSimpleName()),
        true)
    );
}
StructType schemaObject1 = DataTypes.createStructType(fieldsObject1);

Dataset<MyOwnObject1> object1DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .schema(schemaObject1)
    .csv(pathToFile1)
    .as(encoderObject1);
Run Code Online (Sandbox Code Playgroud)

工作正常。

“最好”的解决方案是这样的:

  Dataset<MyOwnObject1> object1DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .schema(encoderObject1.schema())
    .csv(pathToFile1)
    .as(encoderObject1);
Run Code Online (Sandbox Code Playgroud)

但是encoderObject1.schema()返回给我一个Schema,其中的vars按字母顺序排列,而不是按原始顺序排列,那么当我读取csv时此选项会失败。也许编码器应该返回一个模式,其中变量按原始顺序排列,而不是按字母顺序排列