spark-sql中的NullPointerException

Ami*_*jan 5 java bigdata apache-spark

我正在编写一个程序,使用spark-sql在一个公共参数上连接两个文件.我认为我的代码很好但是当我试图将其保存为文本文件时,我收到错误.我把我的代码如下: -

import java.util.regex.Pattern;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;



import java.io.Serializable;


public class JoinCSV {
    @SuppressWarnings("serial")
    public static class CompleteSample implements Serializable {
        private String ASSETNUM;
        private String ASSETTAG;
        private String CALNUM;



        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getASSETTAG() {
            return ASSETTAG;
        }
        public void setASSETTAG(String aSSETTAG) {
            ASSETTAG = aSSETTAG;
        }
        public String getCALNUM() {
            return CALNUM;
        }
        public void setCALNUM(String cALNUM) {
            CALNUM = cALNUM;
        }


      }

    @SuppressWarnings("serial")
    public static class ExtendedSample implements Serializable {

        private String ASSETNUM;
        private String CHANGEBY;
        private String CHANGEDATE;


        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getCHANGEBY() {
            return CHANGEBY;
        }
        public void setCHANGEBY(String cHANGEBY) {
            CHANGEBY = cHANGEBY;
        }
        public String getCHANGEDATE() {
            return CHANGEDATE;
        }
        public void setCHANGEDATE(String cHANGEDATE) {
            CHANGEDATE = cHANGEDATE;
        }
    }

    private static final Pattern comma = Pattern.compile(",");
    @SuppressWarnings("serial")
    public static void main(String[] args) throws Exception {
        String path="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv";
        String path1="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv";

          JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");
          JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

          JavaRDD<CompleteSample> cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
                  new Function<String, CompleteSample>() {
                    public CompleteSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      CompleteSample cs = new CompleteSample();
                      cs.setASSETNUM(parts[0]);
                      cs.setASSETTAG(parts[1]);
                      cs.setCALNUM(parts[2]);

                      return cs;
                    }
                  });

          JavaRDD<ExtendedSample> es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
                  new Function<String, ExtendedSample>() {
                    public ExtendedSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      ExtendedSample es = new ExtendedSample();
                      es.setASSETNUM(parts[0]);
                      es.setCHANGEBY(parts[1]);
                      es.setCHANGEDATE(parts[2]);

                      return es;
                    }
                  });

          JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);
            complete.registerAsTable("cs");

          JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);
          extended.registerAsTable("es");

          JavaSchemaRDD fs= sqlCtx.sql("SELECT ASSETTAG, CALNUM FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");
          fs.saveAsTextFile("result");                   //Here I am getting error
    }

}
Run Code Online (Sandbox Code Playgroud)

我的错误如下: -

    14/07/19 00:40:13 INFO TaskSchedulerImpl: Cancelling stage 0
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 4 on host localhost: java.lang.NullPointerException
            java.lang.ProcessBuilder.start(Unknown Source)
            org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
            org.apache.hadoop.util.Shell.run(Shell.java:379)
            org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
            org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
------------
------------
Run Code Online (Sandbox Code Playgroud)

 14/07/19 00:40:11 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
    java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
        at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278)
        at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300)
        at org.apache.hadoop.util.Shell.<clinit>(Shell.java:293)
        at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
        at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362)
        at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
        at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
-----------------
-----------------
Run Code Online (Sandbox Code Playgroud)

无论我使用spark,spark-sql还是spark-streaming,第二个错误都会出现.我不知道这个错误是什么.但似乎第二个错误对代码没有影响,因为即使在此错误之后,结果也会变得很好.但是每次运行程序时看到一个未知错误仍然非常恼人.

有人可以帮我理解这个问题吗?我非常困惑.谢谢

Dyl*_*ogg 7

有一个解决rdd.saveAsTextFile()Windows错误的方法.它修复了我在Windows 8.1本地模式下使用Spark v1.1.0时遇到的错误SparkExceptionIOException错误.

http://qnalist.com/questions/4994960/run-spark-unit-test-on-windows-7

以下是该链接的步骤:

1)下载编译的winutils.exe ;

2)把它放在某个地方c:\winutil\bin;

3)将此行添加到您的代码中: System.setProperty("hadoop.home.dir", "c:\\winutil\\")

希望这对你有用.