小编Fra*_*kie的帖子

火花如何处理物体

为了测试spark中的序列化异常,我用2种方式编写了一个任务.
第一种方式:

package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object dd {
  def main(args: Array[String]):Unit = {
    val sparkConf = new SparkConf
    val sc = new SparkContext(sparkConf)

    val data = List(1,2,3,4,5)
    val rdd = sc.makeRDD(data)
    val result = rdd.map(elem => {
      funcs.func_1(elem)
    })        
    println(result.count())
  }
}

object funcs{
  def func_1(i:Int): Int = {
    i + 1
  }
}
Run Code Online (Sandbox Code Playgroud)

这种方式火花效果很好.
当我将其更改为以下方式时,它不起作用并抛出NotSerializableException.
第二种方式:

package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object dd {
  def main(args: Array[String]):Unit = {
    val sparkConf = new SparkConf
    val …
Run Code Online (Sandbox Code Playgroud)

serialization apache-spark rdd

7
推荐指数
2
解决办法
2856
查看次数

我怎么知道spark-core版本?

我在hdp中使用spark 1.5.2,hadoop的版本是2.7.1.2.3.4.7-4.当我尝试在像这样的maven pom文件中添加jar时

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.5.2</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

我不知道在哪里可以找到spark-core的版本.有2.11和2.10.任何帮助表示赞赏.

hadoop hortonworks-data-platform apache-spark

6
推荐指数
1
解决办法
2162
查看次数

Java API到HBase异常:无法获取位置

我正在尝试使用JAVA API连接到HBase.我的代码如下所示:

public class Test {
    public static void main(String[] args) throws IOException{
        TableName tableName = TableName.valueOf("TABLE2");

        Configuration conf = HBaseConfiguration.create();
        conf.set("zookeeper.znode.parent", "/hbase-secure");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("hbase.zookeeper.quorum", "xxxxxxxxxxxxxx");
        conf.set("hbase.master", "xxxxxxxxxxxxx");

        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        System.out.println(admin.toString());

        if(!admin.tableExists(tableName)){
            admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor("cf")));
        }

        Table table = conn.getTable(tableName);
        Put p = new Put(Bytes.toBytes("AAPL10232015"));
        p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("close"), Bytes.toBytes(119));
        table.put(p);

        Result r = table.get(new Get(Bytes.toBytes("AAPL10232015")));
        System.out.println(r);
    }
}
Run Code Online (Sandbox Code Playgroud)

当我在我的集​​群中运行这个程序时,我遇到了异常:我运行了这个并得到以下错误:

Exception in thread "main" org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the locations
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:312)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:151)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59) …
Run Code Online (Sandbox Code Playgroud)

java hbase apache-zookeeper

6
推荐指数
1
解决办法
8193
查看次数

在Java中关闭ExecutorService的最佳方法

我正在尝试用Java编写异步编程,我正在使用ExecutorService创建一个由多个线程支持的池来提交多个可调用任务,但我对如何关闭ExecutorService几乎没有疑问.

这是我的原始代码:

ExecutorService executorService = Executors.newFixedThreadPool(10);

Future<String> f = executorService.submit(() -> {/*do something*/});

executorService.shutdown();

String result = f.get();
System.out.println(result);
Run Code Online (Sandbox Code Playgroud)

这很好用,执行程序在线程完成后关闭.但我很担心如果写出错误的话,可调用任务中的代码 f.get()将永远占用,程序将永远停止并永不退出.

有了担心,这是我的第二次尝试:

ExecutorService executorService = Executors.newFixedThreadPool(10);

Future<String> f = executorService.submit(() -> {/*do something*/});

executorService.shutdown();

if(!executorService.awaitTermination(10, TimeUnit.SECONDS)){
    executorService.shutdownNow();
}

String result = f.get();
System.out.println(result);
Run Code Online (Sandbox Code Playgroud)

使用上面的代码,我可以确保线程在10秒后关闭.但实际上程序被阻止了10秒,线程可能只用了5秒钟.

我的问题是如何设置强制关闭池中的线程的时间,以便我不需要显式使用awaitTermination来阻止程序.

java multithreading

5
推荐指数
1
解决办法
803
查看次数

SparkContext.addJar 在本地模式下不起作用

当spark作业中需要有jar文件时,需要通过两种方式添加到spark作业中:
1.--jar path命令中的选项。
2 SparkContext.addJar("path")..
谁能告诉我这两种方式之间的区别?
这个问题来看,答案是它们是相同的,只是优先级不同,但我认为这不是真的。--jars如果我在纱线集群模式下提交 Spark 作业,根据官方网站,如果命令中的选项中未包含 jar 文件,则 addJar() 将无法工作。

如果您将 SparkContext.addJar 函数与本地文件一起使用并在纱线集群模式下运行,则 --jars 选项允许 SparkContext.addJar 函数工作。如果您将其与 HDFS、HTTP、HTTPS 或 FTP 文件一起使用,则不需要使用它。

原因是驱动程序与客户端运行在不同的机器上。因此,--jars命令中的选项似乎来自客户端,而函数addJar()只能在驱动程序中的 jar 上工作。

然后我在本地模式下进行了测试。

1.spark-shell --master local --jars path/to/jar

如果我以这种方式启动spark-shell,则jar中的对象可以在spark-shell中使用

2.spark-shell --master local

如果我以这种方式启动spark-shell并使用sc.addJar("path/to/jar"),则jar文件中的对象无法导入到spark-shell中,并且出现class cannot be found错误。

我的问题是:

为什么该方法SparkContext.addJar()在本地模式下不起作用?

SparkContext.addJar()和 和有什么区别--jars

我的环境:hortonworks 2.5集群,spark版本是1.6.2。如果有人能对此有所了解,我将不胜感激。

apache-spark

3
推荐指数
1
解决办法
3311
查看次数

在现有RDD中创建新记录

我试图在RDD中创建更多记录:

现在,我有一个RDD[(String, List(String))],内容是:

("str_1", List("sub_str_1", "sub_str_2"))  
("str_2", List("sub_str_3", "sub_str_4")) 
("str_3", List("sub_str_5", "sub_str_6"))
Run Code Online (Sandbox Code Playgroud)

我想把它转换为RDD[(String, String)]flatting list[String].
转换后,内容应该是

("str_1", "sub_str_1")
("str_1", "sub_str_2")
("str_2", "sub_str_3")
("str_2", "sub_str_4")
("str_3", "sub_str_5")
("str_3", "sub_str_6")
Run Code Online (Sandbox Code Playgroud)

似乎可以应用于RDD的所有方法都无法增加记录数.我能做的就是将当前的转换为具有相同记录数的新RDD.

我的问题:有没有办法增加RDD中的记录数量?

apache-spark rdd

2
推荐指数
1
解决办法
95
查看次数

在scala中的Map函数中加下下划线

关于在map函数中使用下划线的快速问题,假设我有一个RDD如下:

val R_1 = sc.parallelize(List((1, 2), (3, 4), (5, 6)))
R_1.map(x => x._1 + x._2)
Run Code Online (Sandbox Code Playgroud)

结果是(3,7,11)

我用R_1.map(_._1 + _._2)这个时遇到错误 .

我不太了解scala lambda表达式中的下划线魔法.所以我的问题是R_1.map(x => x._1 + x._2)和之间有什么区别R_1.map(_._1 + _._2).有没有其他的写作方式R_1.map(x => x._1 + x._2)?任何帮助表示赞赏.

scala

1
推荐指数
1
解决办法
1455
查看次数