小编Nic*_*aro的帖子

Spark执行程序登录YARN

我正在Cloudera集群上以YARN客户端模式启动分布式Spark应用程序.过了一段时间,我在Cloudera Manager上看到了一些错误.一些执行器断开连接,系统地发生这种情况.我想调试该问题,但YARN没有报告内部异常.

Exception from container-launch with container ID: container_1417503665765_0193_01_000003 and exit code: 1
ExitCodeException exitCode=1: 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
    at org.apache.hadoop.util.Shell.run(Shell.java:455)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:196)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

如何查看异常的堆栈跟踪?似乎YARN仅报告应用程序异常退出.有没有办法在YARN配置中查看spark executor日志?

cloudera hadoop-yarn cloudera-manager apache-spark

14
推荐指数
1
解决办法
7963
查看次数

惰性字段的序列化如何工作?

当出于某些原因需要推迟评估值时,我知道惰性字段的好处.我想知道在序列化方面懒惰字段的行为是什么.

考虑以下课程.

class MyClass {
  lazy val myLazyVal = {...}
  ...
}
Run Code Online (Sandbox Code Playgroud)

问题:

  • 如果MyClass的一个实例被序列化,那么惰性字段是否也被序列化了?
  • 如果在序列化之前访问了字段,序列化的行为是否会发生变化?我的意思是,如果我不对该字段进行评估,它是否被认为是空的
  • 序列化机制是否会引发对惰性字段的隐式评估?
  • 是否有一种简单的方法可以避免变量的序列化并在反序列化后懒惰再次重新计算值?这应该独立于该领域的评估.

serialization scala

14
推荐指数
1
解决办法
3704
查看次数

Spark使用数据局部性吗?

我正在尝试了解Apache Spark的内部结构.我想知道Spark是否在从InputFormat读取或写入OutputFormat(或Spark本身支持的其他格式而不是从MapReduce派生)时使用某些机制来确保数据局部性.

在第一种情况下(在读),我的理解是,使用InputFormat时,劈叉获得与主机,以便星火试图将任务分配给执行者,以减少网络传输尽可能多的相关的(或主机??)包含数据尽可能.

在写作的情况下,这种机制将如何运作?我知道从技术上讲,HDFS中的文件可以保存在本地的任何节点中并复制到其他两个(因此您可以将网络用于3个副本中的两个),但是,如果您考虑写入其他系统,例如NoSQL数据库( Cassandra,HBase,其他......),这类系统有自己的分发数据的方式.是否有一种方法来告诉火花分区在于优化由输出水槽预期数据的分布的基础上,数据局部性的方式的RDD(目标的NoSQL数据库,看到天然或通过OUTPUTFORMAT)?

我指的是一个环境,其中Spark节点和NoSQL节点存在于相同的phisical机器中.

hadoop hbase cassandra apache-spark

13
推荐指数
1
解决办法
2957
查看次数

导入依赖管理与排除

我有一个漂亮的BOM,其dependencyManagement部分有很多依赖项,我想创建另一个BOM,导入除了一个以外的所有依赖项.我试过这样做:

... in my dependencyManagement section
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-dependencies</artifactId>
    <version>${spring-boot-version}</version>
    <type>pom</type>
    <scope>import</scope>

    <exclusions>
        <exclusion>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        </exclusion>
    </exclusions>
</dependency>
...
Run Code Online (Sandbox Code Playgroud)

POM正式正确,一切都在编译.但排除被忽略了.我错过了什么?这种方法是否正确?

我正在使用Maven 3+.

maven

13
推荐指数
2
解决办法
5175
查看次数

在Cassandra中,异步写入似乎被打破了

我在向12节点cassandra(2.1.2)集群写入9百万行批处理时遇到了spark-cassandra-connector(1.0.4,1.1.0)的问题.我写的是一致性ALL并且读取一致性ONE,但读取的行数每次都不同于900万(8.865.753,8.753.213等).

我检查了连接器的代码,发现没有问题.然后,我决定编写自己的应用程序,独立于spark和连接器,来调查问题(唯一的依赖是datastax-driver-code version 2.1.3).

现在可以在github找到完整代码,启动脚本和配置文件.

在伪代码中,我写了两个不同版本的应用程序,同步一个:

try (Session session = cluster.connect()) {

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
    PreparedStatement pstm = session.prepare(cql);

    for(String partitionKey : keySource) {
        // keySource is an Iterable<String> of partition keys

        BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
        bound.setConsistencyLevel(ConsistencyLevel.ALL);

        session.execute(bound);
    }

}
Run Code Online (Sandbox Code Playgroud)

而异步的一个:

try (Session session = cluster.connect()) {

    List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();

    String cql = "insert …
Run Code Online (Sandbox Code Playgroud)

java cassandra

11
推荐指数
2
解决办法
4305
查看次数

如何计算前k个单词

我想在Spark-Streaming应用程序中计算前k个单词,在时间窗口中收集文本行.

我最终得到以下代码:

...
val window = stream.window(Seconds(30))

val wc = window
  .flatMap(line => line.split(" "))
  .map(w => (w, 1))
  .reduceByKey(_ + _)

wc.foreachRDD(rdd => {
  println("---------------------------------------------------")
  rdd.top(10)(Ordering.by(_._2)).zipWithIndex.foreach(println)
})
...
Run Code Online (Sandbox Code Playgroud)

它似乎工作.

问题:前k字图表是使用在(变量)返回的foreachRDD每个上执行top + print函数的函数计算的.RDDreduceByKeywc

事实证明,reduceByKey返回一个DStream单一的RDD,所以上面的代码工作,但规格不保证正确的行为.

我错了,它适用于所有情况吗?

为什么在spark-streaming中没有一种方法可以将a DStream作为单个RDD而不是RDD对象集合来考虑,以便执行更复杂的转换?

我的意思是这样的功能:dstream.withUnionRDD(rdd => ...)允许您对单个/联合进行转换和操作RDD.有没有相同的方法来做这样的事情?

spark-streaming

11
推荐指数
1
解决办法
888
查看次数

在负面情况下,未记录和记录的Cassandra批次之间的差异

我理解Cassandra中LOGGED和UNLOGGED批次在原子性方面的基本区别.从本质上讲,LOGGED批次是原子的,而UNLOGGED则不是.这意味着LOGGED批处理中的所有语句都可以一起执行(或不执行).

在UNLOGGED批处理的情况下,如果在编写语句的写操作期间出现问题,我知道已经执行的语句没有回滚,但Cassandra是否通知驱动程序整批的失败?

cassandra

10
推荐指数
1
解决办法
1577
查看次数

为什么Java文件只以其规范形式存在?

我正面临JVM的奇怪行为.我想更改用户目录,即查找文件的目录,通常对应于java运行命令的路径.

所以我写了下面的代码:

System.setProperty("user.dir", "/tmp/");
File f = new File("myfile");
System.out.println(f.exists());
System.out.println(f.getCanonicalFile().exists());
Run Code Online (Sandbox Code Playgroud)

该文件/tmp/myfile存在并且可由JVM读取,但如果我在运行该代码时不在/tmp/,则结果为:

false
true

它们是相同的文件,Java能够检索它的正确规范形式,但相对的不存在,而规范的存在.

这是一个错误吗?有没有办法可靠地更改JVM用户目录?

更改代码不是一个选项,因为我正在尝试运行外部库.

java

9
推荐指数
1
解决办法
219
查看次数

检查Cassandra中是否启用了JNA

我从其他帖子中了解到,如果JNA配置不正确,Cassandra会在日志文件中写一些警告.我的日志文件(cassandra.log和system.log)不包含单词"jna"(所有案例都已选中).我可以假设为我的Cassandra实例正确配置了JNA吗?

有没有办法检查JNA配置是否正确?

cassandra

8
推荐指数
1
解决办法
3190
查看次数

Spark/YARN允许多线程吗?

由于YARN负责管理Hadoop应用程序(如内核和进程)的资源,是否允许YARN应用程序创建新线程?

我主要指的是在YARN资源管理器上运行的Spark应用程序.我知道如果你创建一个新线程没有抛出错误,但它是否安全?

multithreading hadoop-yarn apache-spark

7
推荐指数
1
解决办法
1112
查看次数