Tom*_*lek 16 java hadoop-yarn apache-spark spark-launcher
我试图通过Java代码将带有Spark作业的JAR提交到YARN集群中.我使用SparkLauncher提交SparkPi示例:
Process spark = new SparkLauncher()
.setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar")
.setMainClass("org.apache.spark.examples.SparkPi")
.setMaster("yarn-cluster")
.launch();
System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
Run Code Online (Sandbox Code Playgroud)
有两个问题:
我尝试使用Oracle Java 7和8执行提交应用程序.
Tom*_*lek 17
我在Spark邮件列表中得到了帮助.关键是在Process上读取/清除getInputStream和getErrorStream().子进程可能会填满缓冲区并导致死锁 - 请参阅有关Process的Oracle文档.应该在不同的线程中读取流:
Process spark = new SparkLauncher()
.setSparkHome("C:\\spark-1.4.1-bin-hadoop2.6")
.setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar")
.setMainClass("org.apache.spark.examples.SparkPi").setMaster("yarn-cluster").launch();
InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(spark.getInputStream(), "input");
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
inputThread.start();
InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(spark.getErrorStream(), "error");
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
errorThread.start();
System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
Run Code Online (Sandbox Code Playgroud)
其中InputStreamReaderRunnable类是:
public class InputStreamReaderRunnable implements Runnable {
private BufferedReader reader;
private String name;
public InputStreamReaderRunnable(InputStream is, String name) {
this.reader = new BufferedReader(new InputStreamReader(is));
this.name = name;
}
public void run() {
System.out.println("InputStream " + name + ":");
try {
String line = reader.readLine();
while (line != null) {
System.out.println(line);
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Run Code Online (Sandbox Code Playgroud)
由于这是一个旧帖子,我想添加一个更新,可能有助于谁曾阅读过这篇文章.在spark 1.6.0中,SparkLauncher类中增加了一些函数.这是:
def startApplication(listeners: <repeated...>[Listener]): SparkAppHandle
Run Code Online (Sandbox Code Playgroud)
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.launcher.SparkLauncher
你可以运行应用程序而不需要额外的stdout线程和stderr处理plush有一个很好的状态报告正在运行的应用程序.使用此代码:
val env = Map(
"HADOOP_CONF_DIR" -> hadoopConfDir,
"YARN_CONF_DIR" -> yarnConfDir
)
val handler = new SparkLauncher(env.asJava)
.setSparkHome(sparkHome)
.setAppResource("Jar/location/.jar")
.setMainClass("path.to.the.main.class")
.setMaster("yarn-client")
.setConf("spark.app.id", "AppID if you have one")
.setConf("spark.driver.memory", "8g")
.setConf("spark.akka.frameSize", "200")
.setConf("spark.executor.memory", "2g")
.setConf("spark.executor.instances", "32")
.setConf("spark.executor.cores", "32")
.setConf("spark.default.parallelism", "100")
.setConf("spark.driver.allowMultipleContexts","true")
.setVerbose(true)
.startApplication()
println(handle.getAppId)
println(handle.getState)
Run Code Online (Sandbox Code Playgroud)
如果火花应用,你可以继续征服状态,直到它成功.有关Spark Launcher服务器如何在1.6.0中工作的信息.看到这个链接:https: //github.com/apache/spark/blob/v1.6.0/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
| 归档时间: |
|
| 查看次数: |
15182 次 |
| 最近记录: |