小编Vis*_*mdi的帖子

问题在Yarn Cluster上运行Spark Job

我想在Hadoop YARN集群模式下运行我的spark Job ,我使用以下命令:

spark-submit --master yarn-cluster 
             --driver-memory 1g 
             --executor-memory 1g
             --executor-cores 1 
             --class com.dc.analysis.jobs.AggregationJob
               sparkanalitic.jar param1 param2 param3
Run Code Online (Sandbox Code Playgroud)

我收到错误,请提出错误,命令是否正确.我正在使用CDH 5.3.1.

Diagnostics: Application application_1424284032717_0066 failed 2 times due 
to AM Container for appattempt_1424284032717_0066_000002 exited with  
exitCode: 15 due to: Exception from container-launch.

Container id: container_1424284032717_0066_02_000001
Exit code: 15
Stack trace: ExitCodeException exitCode=15: 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
    at org.apache.hadoop.util.Shell.run(Shell.java:455)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:197)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)  

Container exited with a non-zero exit …
Run Code Online (Sandbox Code Playgroud)

hadoop hdfs cloudera hadoop-yarn apache-spark

18
推荐指数
2
解决办法
3万
查看次数

在纱线中启动/停止火花流工作的正确方法是什么?

我一直在试验和谷歌搜索几个小时,没有运气.

我有一个火花流应用程序,在本地火花群中运行良好.现在我需要在cloudera 5.4.4上部署它.我需要能够启动它,让它在后台持续运行,并能够阻止它.

我试过这个:

$ spark-submit --master yarn-cluster --class MyMain my.jar myArgs
Run Code Online (Sandbox Code Playgroud)

但它只是无休止地印刷这些线条.

15/07/28 17:58:18 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
15/07/28 17:58:19 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
Run Code Online (Sandbox Code Playgroud)

问题1:因为它是一个流媒体应用程序,它需要连续运行.那么我该如何在"后台"模式下运行呢?我发现在纱线上提交火花作业的所有例子似乎都假设应用程序会做一些工作并终止,因此你想要在前台运行它.但流媒体并非如此.

接下来......此时应用程序似乎无法正常运行.我认为这可能是我的错误或配置错误,所以我试着查看日志以查看发生了什么:

$ yarn logs -applicationId application_1438092860895_012
Run Code Online (Sandbox Code Playgroud)

但它告诉我:

/tmp/logs/hdfs/logs/application_1438092860895_0012does not have any log files.
Run Code Online (Sandbox Code Playgroud)

所以,问题编号为2:如果应用程序正在运行,为什么它没有日志文件?

所以最终我不得不杀了它:

$ yarn application -kill application_1438092860895_012
Run Code Online (Sandbox Code Playgroud)

这提出了问题3:假设我最终可以在后台启动并运行应用程序,"纱线应用程序 - 杀手"是阻止它的首选方式吗?

hadoop cloudera hadoop-yarn apache-spark spark-streaming

11
推荐指数
1
解决办法
9655
查看次数

如何在没有重新分区和copyMerge的情况下合并spark结果文件?

我用下一个代码:

csv.saveAsTextFile(pathToResults, classOf[GzipCodec])
Run Code Online (Sandbox Code Playgroud)

pathToResults目录有很多文件,如part-0000,part-0001等.我可以使用FileUtil.copyMerge(),但它真的很慢,它下载驱动程序上的所有文件,然后将它们上传到hadoop.但FileUtil.copyMerge()比以下更快:

csv.repartition(1).saveAsTextFile(pathToResults, classOf[GzipCodec])
Run Code Online (Sandbox Code Playgroud)

如何在没有重新分区和FileUtil.copyMerge()的情况下合并spark结果文件?

hadoop scala apache-spark

7
推荐指数
1
解决办法
2万
查看次数

更改DataFrame.write()的输出文件名前缀

通过Spark SQL DataFrame.write()方法生成的输出文件以"part"basename前缀开头.例如

DataFrame sample_07 = hiveContext.table("sample_07");
sample_07.write().parquet("sample_07_parquet");
Run Code Online (Sandbox Code Playgroud)

结果是:

hdfs dfs -ls sample_07_parquet/                                                                                                                                                             
Found 4 items
-rw-r--r--   1 rob rob          0 2016-03-19 16:40 sample_07_parquet/_SUCCESS
-rw-r--r--   1 rob rob        491 2016-03-19 16:40 sample_07_parquet/_common_metadata
-rw-r--r--   1 rob rob       1025 2016-03-19 16:40 sample_07_parquet/_metadata
-rw-r--r--   1 rob rob      17194 2016-03-19 16:40 sample_07_parquet/part-r-00000-cefb2ac6-9f44-4ce4-93d9-8e7de3f2cb92.gz.parquet
Run Code Online (Sandbox Code Playgroud)

我想更改使用Spark SQL DataFrame.write()创建文件时使用的输出文件名前缀.我尝试在Spark上下文的hadoop配置上设置"mapreduce.output.basename"属性.例如

public class MyJavaSparkSQL {

  public static void main(String[] args) throws Exception {
    SparkConf sparkConf = new SparkConf().setAppName("MyJavaSparkSQL");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    ctx.hadoopConfiguration().set("mapreduce.output.basename", "myprefix");
    HiveContext hiveContext = new org.apache.spark.sql.hive.HiveContext(ctx.sc()); …
Run Code Online (Sandbox Code Playgroud)

java mapreduce apache-spark apache-spark-sql

7
推荐指数
1
解决办法
8627
查看次数

Spark Streaming 中的背压属性是如何工作的?

我有一个接收单个事件(字符串)的 CustomReceiver。在火花应用程序的运行时使用接收到的单个事件从 nosql 读取数据并应用转换。当观察到每个批次的处理时间大于批次间隔时,我设置这个属性。

spark.streaming.backpressure.enabled=true

之后,我预计 CustomReceiver 不会在批处理处理时间长于批处理窗口时触发和接收事件,这并没有发生,并且仍然添加了一批积压的批次。我在这里错过了什么吗?

hadoop backpressure apache-spark spark-streaming

7
推荐指数
1
解决办法
6229
查看次数

Spark DStream的foreachDD函数中RDD的并发转换

在下面的代码中,似乎函数fn1和fn2以顺序方式应用于inRDD,正如我在Spark Web UI的Stages部分中看到的那样.

 DstreamRDD1.foreachRDD(new VoidFunction<JavaRDD<String>>()
 { 
     public void call(JavaRDD<String> inRDD)
        {
          inRDD.foreach(fn1)
          inRDD.foreach(fn2)
        }
 }
Run Code Online (Sandbox Code Playgroud)

流媒体作业以这种方式运行时有何不同.以下函数是否在输入Dstream上并行运行?

DStreamRDD1.foreachRDD(fn1)
DStreamRDD2.foreachRDD(fn2)
Run Code Online (Sandbox Code Playgroud)

java apache-spark spark-streaming rdd dstream

5
推荐指数
1
解决办法
228
查看次数

Spark排序RDD并加入他们的排名

我有一个RDD[(VertexId, Double)],我希望对它进行排序,_._2并使用此RDD加入索引(排名).因此,我可以获得一个元素及其排名filter.

目前我对RDD进行排序sortBy,但我不知道如何加入RDD及其排名.所以我把它作为一个序列收集并用它的索引压缩它.但这并不高效.我想知道是否有更优雅的方式来做到这一点.

我现在使用的代码是:

val tmpRes = graph.vertices.sortBy(_._2, ascending = false) // Sort all nodes by its PR score in descending order
      .collect() // collect to master, this may be very expensive

    tmpRes.zip(tmpRes.indices) // zip with index
Run Code Online (Sandbox Code Playgroud)

scala apache-spark rdd

2
推荐指数
1
解决办法
2806
查看次数

从spark变换函数中从HDFS动态读取文件

如何在不使用函数中的sparkContext的spark函数中读取HDFS中的文件.

例:

val filedata_rdd = rdd.map { x => ReadFromHDFS(x.getFilePath) }
Run Code Online (Sandbox Code Playgroud)

问题是如何实现ReadFromHDFS?通常从HDFS读取我们可以做sc.textFile但在这种情况下sc不能在函数中使用.

apache-spark spark-streaming

1
推荐指数
1
解决办法
1441
查看次数