为了测试spark中的序列化异常,我用2种方式编写了一个任务.
第一种方式:
package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object dd {
def main(args: Array[String]):Unit = {
val sparkConf = new SparkConf
val sc = new SparkContext(sparkConf)
val data = List(1,2,3,4,5)
val rdd = sc.makeRDD(data)
val result = rdd.map(elem => {
funcs.func_1(elem)
})
println(result.count())
}
}
object funcs{
def func_1(i:Int): Int = {
i + 1
}
}
Run Code Online (Sandbox Code Playgroud)
这种方式火花效果很好.
当我将其更改为以下方式时,它不起作用并抛出NotSerializableException.
第二种方式:
package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object dd {
def main(args: Array[String]):Unit = {
val sparkConf = new SparkConf
val …Run Code Online (Sandbox Code Playgroud) 我在hdp中使用spark 1.5.2,hadoop的版本是2.7.1.2.3.4.7-4.当我尝试在像这样的maven pom文件中添加jar时
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
我不知道在哪里可以找到spark-core的版本.有2.11和2.10.任何帮助表示赞赏.
我正在尝试使用JAVA API连接到HBase.我的代码如下所示:
public class Test {
public static void main(String[] args) throws IOException{
TableName tableName = TableName.valueOf("TABLE2");
Configuration conf = HBaseConfiguration.create();
conf.set("zookeeper.znode.parent", "/hbase-secure");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("hbase.zookeeper.quorum", "xxxxxxxxxxxxxx");
conf.set("hbase.master", "xxxxxxxxxxxxx");
Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
System.out.println(admin.toString());
if(!admin.tableExists(tableName)){
admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor("cf")));
}
Table table = conn.getTable(tableName);
Put p = new Put(Bytes.toBytes("AAPL10232015"));
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("close"), Bytes.toBytes(119));
table.put(p);
Result r = table.get(new Get(Bytes.toBytes("AAPL10232015")));
System.out.println(r);
}
}
Run Code Online (Sandbox Code Playgroud)
当我在我的集群中运行这个程序时,我遇到了异常:我运行了这个并得到以下错误:
Exception in thread "main" org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the locations
at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:312)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:151)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59) …Run Code Online (Sandbox Code Playgroud) 我正在尝试用Java编写异步编程,我正在使用ExecutorService创建一个由多个线程支持的池来提交多个可调用任务,但我对如何关闭ExecutorService几乎没有疑问.
这是我的原始代码:
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> f = executorService.submit(() -> {/*do something*/});
executorService.shutdown();
String result = f.get();
System.out.println(result);
Run Code Online (Sandbox Code Playgroud)
这很好用,执行程序在线程完成后关闭.但我很担心如果写出错误的话,可调用任务中的代码 f.get()将永远占用,程序将永远停止并永不退出.
有了担心,这是我的第二次尝试:
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> f = executorService.submit(() -> {/*do something*/});
executorService.shutdown();
if(!executorService.awaitTermination(10, TimeUnit.SECONDS)){
executorService.shutdownNow();
}
String result = f.get();
System.out.println(result);
Run Code Online (Sandbox Code Playgroud)
使用上面的代码,我可以确保线程在10秒后关闭.但实际上程序被阻止了10秒,线程可能只用了5秒钟.
我的问题是如何设置强制关闭池中的线程的时间,以便我不需要显式使用awaitTermination来阻止程序.
当spark作业中需要有jar文件时,需要通过两种方式添加到spark作业中:
1.--jar path命令中的选项。
2 SparkContext.addJar("path")..
谁能告诉我这两种方式之间的区别?
从这个问题来看,答案是它们是相同的,只是优先级不同,但我认为这不是真的。--jars如果我在纱线集群模式下提交 Spark 作业,根据官方网站,如果命令中的选项中未包含 jar 文件,则 addJar() 将无法工作。
如果您将 SparkContext.addJar 函数与本地文件一起使用并在纱线集群模式下运行,则 --jars 选项允许 SparkContext.addJar 函数工作。如果您将其与 HDFS、HTTP、HTTPS 或 FTP 文件一起使用,则不需要使用它。
原因是驱动程序与客户端运行在不同的机器上。因此,--jars命令中的选项似乎来自客户端,而函数addJar()只能在驱动程序中的 jar 上工作。
然后我在本地模式下进行了测试。
1.spark-shell --master local --jars path/to/jar
如果我以这种方式启动spark-shell,则jar中的对象可以在spark-shell中使用
2.spark-shell --master local
如果我以这种方式启动spark-shell并使用sc.addJar("path/to/jar"),则jar文件中的对象无法导入到spark-shell中,并且出现class cannot be found错误。
我的问题是:
为什么该方法SparkContext.addJar()在本地模式下不起作用?
SparkContext.addJar()和 和有什么区别--jars?
我的环境:hortonworks 2.5集群,spark版本是1.6.2。如果有人能对此有所了解,我将不胜感激。
我试图在RDD中创建更多记录:
现在,我有一个RDD[(String, List(String))],内容是:
("str_1", List("sub_str_1", "sub_str_2"))
("str_2", List("sub_str_3", "sub_str_4"))
("str_3", List("sub_str_5", "sub_str_6"))
Run Code Online (Sandbox Code Playgroud)
我想把它转换为RDD[(String, String)]flatting list[String].
转换后,内容应该是
("str_1", "sub_str_1")
("str_1", "sub_str_2")
("str_2", "sub_str_3")
("str_2", "sub_str_4")
("str_3", "sub_str_5")
("str_3", "sub_str_6")
Run Code Online (Sandbox Code Playgroud)
似乎可以应用于RDD的所有方法都无法增加记录数.我能做的就是将当前的转换为具有相同记录数的新RDD.
我的问题:有没有办法增加RDD中的记录数量?
关于在map函数中使用下划线的快速问题,假设我有一个RDD如下:
val R_1 = sc.parallelize(List((1, 2), (3, 4), (5, 6)))
R_1.map(x => x._1 + x._2)
Run Code Online (Sandbox Code Playgroud)
结果是(3,7,11)
我用R_1.map(_._1 + _._2)这个时遇到错误
.
我不太了解scala lambda表达式中的下划线魔法.所以我的问题是R_1.map(x => x._1 + x._2)和之间有什么区别R_1.map(_._1 + _._2).有没有其他的写作方式R_1.map(x => x._1 + x._2)?任何帮助表示赞赏.