Spark java使用java.util.Map类型创建行的问题

nag*_*nag 9 java apache-spark spark-dataframe

使用火花2.1

我在里面创建了一个带有MapDataType的DataSet

StructType schema = new StructType(new StructField[]{
                new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("words", DataTypes.StringType, false, Metadata.empty()),
                new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("features", DataTypes.createMapType(DataTypes.StringType, DataTypes.IntegerType), false, Metadata.empty())
        });

        Map<String,Integer> abc = new HashMap<String,Integer>();
        abc.put("abc", 1);
        Row r = RowFactory.create(0, "Hi these are words ", 1, abc);    
        List<Row> data = Arrays.asList(r);
        Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);
        wordDataFrame.show();
Run Code Online (Sandbox Code Playgroud)

以上代码工作正常.

但是当我尝试在这个DataSet上调用map函数(用新的HashMap替换Map DataType条目)时,我收到以下错误.

StructType schema = new StructType(new StructField[]{
                new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("words", DataTypes.StringType, false, Metadata.empty()),
                new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("featuresNew", DataTypes.createMapType(DataTypes.StringType, DataTypes.IntegerType), false, Metadata.empty())
        });


        ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);

        Dataset<Row> output = input.map(new MapFunction<Row, Row>() {
            @Override
            public Row call(Row row) throws Exception {
                Map<String, Integer> newMap = new HashMap<String, Integer>();
                newMap.put("Transformed string", 1);
                return RowFactory.create(row.getInt(0), row.getString(1), row.getInt(2), newMap);
            }
        }, encoder);

        return output;
Run Code Online (Sandbox Code Playgroud)

错误堆栈:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: java.util.HashMap is not a valid external type for schema of map<string,int>
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:410)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745) 
Run Code Online (Sandbox Code Playgroud)

我在这里错过了什么?为什么我得到" java.util.HashMap不是map的架构的有效外部类型 "错误

编辑:

我试过java.util.List数据类型

StructType schema = new StructType(new StructField[]{
                new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("words", DataTypes.StringType, false, Metadata.empty()),
                new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("featuresNew", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
        });

ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
Dataset<Row> output = input.map(new MapFunction<Row, Row>() {
            @Override
            public Row call(Row row) throws Exception {
            List<String> xyz = Arrays.asList("Hi", "how", "now");

                return RowFactory.create(row.getInt(0), row.getString(1), row.getInt(2), xyz);
            }
        }, encoder);
Run Code Online (Sandbox Code Playgroud)

我得到一个类似的错误消息

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: java.util.Arrays$ArrayList is not a valid external type for schema of array<string>
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:221)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

java.lang.String工作正常

StructType schema = new StructType(new StructField[]{
                new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("words", DataTypes.StringType, false, Metadata.empty()),
                new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("featuresNew", DataTypes.StringType, false, Metadata.empty())
        });


        ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
        Dataset<Row> output = input.map(new MapFunction<Row, Row>() {
            @Override
            public Row call(Row row) throws Exception {                  
                String xyz = Arrays.asList("Please", "work", "now").toString();    
                return RowFactory.create(row.getInt(0), row.getString(1), row.getInt(2), xyz);
            }
        }, encoder);
Run Code Online (Sandbox Code Playgroud)

看起来像原始DataTypes工作正常!

解决方案: 这对我有用

我使用将Java HashMap转换为Scala Map并更改了代码,如下所示

StructType schema = new StructType(new StructField[]{
                    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
                    new StructField("words", DataTypes.StringType, false, Metadata.empty()),
                    new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
                    new StructField("featuresNew", DataTypes.createMapType(DataTypes.StringType, DataTypes.IntegerType), false, Metadata.empty())
            });


            ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);

            Dataset<Row> output = input.map(new MapFunction<Row, Row>() {
                @Override
                public Row call(Row row) throws Exception {
                    HashMap<String, Integer> newMap = new HashMap<String,Integer();                    
                    newMap.put("Transformed string", 1);                    
                    return RowFactory.create(row.getInt(0), row.getString(1), row.getInt(2), ToScalaExample.toScalaMap(newMap));
                }
            }, encoder);

            return output;
Run Code Online (Sandbox Code Playgroud)

我认为对于原始数据类型,spark会隐式地将java数据类型转换为Scala数据类型.对于其他我们需要明确转换它们.

Den*_*din 0

以下内容实际上是提问者发现的,我从问题中提取了它,以便其他人可以在正确的位置找到答案:

解决方案: 这对我有用

我使用了[Converting Java HashMap to Scala Map][1]并将代码更改如下

StructType schema = new StructType(new StructField[]{
                    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
                    new StructField("words", DataTypes.StringType, false, Metadata.empty()),
                    new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
                    new StructField("featuresNew", DataTypes.createMapType(DataTypes.StringType, DataTypes.IntegerType), false, Metadata.empty())
            });
    
    
            ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
    
            Dataset<Row> output = input.map(new MapFunction<Row, Row>() {
                @Override
                public Row call(Row row) throws Exception {
                    HashMap<String, Integer> newMap = new HashMap<String,Integer();                    
                    newMap.put("Transformed string", 1);                    
                    return RowFactory.create(row.getInt(0), row.getString(1), row.getInt(2), ToScalaExample.toScalaMap(newMap));
                }
            }, encoder);
    
            return output;
Run Code Online (Sandbox Code Playgroud)

我认为对于原始数据类型,spark 会隐式地将 java 数据类型转换为 Scala 数据类型。对于其他我们需要显式转换它们。