Ami*_*jan 5 java bigdata apache-spark
我正在编写一个程序,使用spark-sql在一个公共参数上连接两个文件.我认为我的代码很好但是当我试图将其保存为文本文件时,我收到错误.我把我的代码如下: -
import java.util.regex.Pattern;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import java.io.Serializable;
public class JoinCSV {
@SuppressWarnings("serial")
public static class CompleteSample implements Serializable {
private String ASSETNUM;
private String ASSETTAG;
private String CALNUM;
public String getASSETNUM() {
return ASSETNUM;
}
public void setASSETNUM(String aSSETNUM) {
ASSETNUM = aSSETNUM;
}
public String getASSETTAG() {
return ASSETTAG;
}
public void setASSETTAG(String aSSETTAG) {
ASSETTAG = aSSETTAG;
}
public String getCALNUM() {
return CALNUM;
}
public void setCALNUM(String cALNUM) {
CALNUM = cALNUM;
}
}
@SuppressWarnings("serial")
public static class ExtendedSample implements Serializable {
private String ASSETNUM;
private String CHANGEBY;
private String CHANGEDATE;
public String getASSETNUM() {
return ASSETNUM;
}
public void setASSETNUM(String aSSETNUM) {
ASSETNUM = aSSETNUM;
}
public String getCHANGEBY() {
return CHANGEBY;
}
public void setCHANGEBY(String cHANGEBY) {
CHANGEBY = cHANGEBY;
}
public String getCHANGEDATE() {
return CHANGEDATE;
}
public void setCHANGEDATE(String cHANGEDATE) {
CHANGEDATE = cHANGEDATE;
}
}
private static final Pattern comma = Pattern.compile(",");
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
String path="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv";
String path1="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv";
JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);
JavaRDD<CompleteSample> cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
new Function<String, CompleteSample>() {
public CompleteSample call(String line) throws Exception {
String[] parts = line.split(",");
CompleteSample cs = new CompleteSample();
cs.setASSETNUM(parts[0]);
cs.setASSETTAG(parts[1]);
cs.setCALNUM(parts[2]);
return cs;
}
});
JavaRDD<ExtendedSample> es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
new Function<String, ExtendedSample>() {
public ExtendedSample call(String line) throws Exception {
String[] parts = line.split(",");
ExtendedSample es = new ExtendedSample();
es.setASSETNUM(parts[0]);
es.setCHANGEBY(parts[1]);
es.setCHANGEDATE(parts[2]);
return es;
}
});
JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);
complete.registerAsTable("cs");
JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);
extended.registerAsTable("es");
JavaSchemaRDD fs= sqlCtx.sql("SELECT ASSETTAG, CALNUM FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");
fs.saveAsTextFile("result"); //Here I am getting error
}
}
Run Code Online (Sandbox Code Playgroud)
我的错误如下: -
14/07/19 00:40:13 INFO TaskSchedulerImpl: Cancelling stage 0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 4 on host localhost: java.lang.NullPointerException
java.lang.ProcessBuilder.start(Unknown Source)
org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
org.apache.hadoop.util.Shell.run(Shell.java:379)
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
------------
------------
Run Code Online (Sandbox Code Playgroud)
和
14/07/19 00:40:11 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:293)
at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362)
at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
-----------------
-----------------
Run Code Online (Sandbox Code Playgroud)
无论我使用spark,spark-sql还是spark-streaming,第二个错误都会出现.我不知道这个错误是什么.但似乎第二个错误对代码没有影响,因为即使在此错误之后,结果也会变得很好.但是每次运行程序时看到一个未知错误仍然非常恼人.
有人可以帮我理解这个问题吗?我非常困惑.谢谢
有一个解决rdd.saveAsTextFile()Windows错误的方法.它修复了我在Windows 8.1本地模式下使用Spark v1.1.0时遇到的错误SparkException和IOException错误.
http://qnalist.com/questions/4994960/run-spark-unit-test-on-windows-7
以下是该链接的步骤:
2)把它放在某个地方c:\winutil\bin;
3)将此行添加到您的代码中: System.setProperty("hadoop.home.dir", "c:\\winutil\\")
希望这对你有用.
| 归档时间: |
|
| 查看次数: |
3494 次 |
| 最近记录: |