小编Val*_*ale的帖子

Java 8可选仅在optional.isPresent时添加返回结果

我有一段代码,其中一个接口有一个Optional返回方法,一些类实现它返回一些东西,其他没有.

为了拥抱这个辉煌的"空杀手",这是我尝试过的:

public interface Gun {
    public Optional<Bullet> shoot();
}

public class Pistol implements Gun{
    @Override
    public Optional<Bullet> shoot(){
        return Optional.of(this.magazine.remove(0)); 
    }//never mind the check of magazine content
}

public class Bow implements Gun{
    @Override
    public Optional<Bullet> shoot(){
        quill--;
        return Optional.empty();
    }
}

public class BallisticGelPuddy{
    private Gun[] guns = new Gun[]{new Pistol(),new Bow()};
    private List<Bullet> bullets = new ArrayList<>();
    public void collectBullets(){
        //here is the problem
        for(Gun gun : guns)
            gun.shoot.ifPresent(bullets.add( <the return I got with the method>)
}} …
Run Code Online (Sandbox Code Playgroud)

java optional java-8 null-check

10
推荐指数
2
解决办法
1万
查看次数

在RDD方法/闭包中使用SparkContext hadoop配置,例如foreachPartition

我正在使用Spark来读取一堆文件,详细说明它们,然后将它们全部保存为Sequence文件.我想要的是每个分区有1个序列文件,所以我这样做了:

SparkConf sparkConf = new SparkConf().setAppName("writingHDFS")
                .setMaster("local[2]")
                .set("spark.streaming.stopGracefullyOnShutdown", "true");
        final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        jsc.hadoopConfiguration().addResource(hdfsConfPath + "hdfs-site.xml");
        jsc.hadoopConfiguration().addResource(hdfsConfPath + "core-site.xml");
        //JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5*1000));

        JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
        if(!imageByteRDD.isEmpty())
            imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {

                @Override
                public void call(Iterator<Tuple2<String, PortableDataStream>> arg0){
                        throws Exception {
                  [°°°SOME STUFF°°°]
                  SequenceFile.Writer writer = SequenceFile.createWriter(
                                     jsc.hadoopConfiguration(), 
//here lies the problem: how to pass the hadoopConfiguration I have put inside the Spark Context? 
Previously, I created a Configuration for each partition, and it works, …
Run Code Online (Sandbox Code Playgroud)

java hadoop apache-spark rdd

6
推荐指数
4
解决办法
6168
查看次数

Spark程序结构:类中的广播变量vs最终静态与外部静态属性

我有一个应用程序应该从文件读取一些行,并使它们成为一个最终变量,将用作参考.
截至目前,在spark上下文开始之前,我在一个类(称为People)类中启动了一个静态方法

reads the file;
fill a final static HashTable;
static{ hashTable.put(eachline);}
Run Code Online (Sandbox Code Playgroud)

在我的转换代码中,例如:

JavaRDD<String> filteredRDD = anotherRDD.filter( new Function<String,Boolean>(){
    public Boolean call(String s){
        People.hashTable.containsKey(s);
    }
});
Run Code Online (Sandbox Code Playgroud)

释疑:

  1. 我应该在SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaStream").setMaster("local[2]");声明后立即使用广播变量吗?
  2. 为什么我要在最后一个选择广播变量?据我所知,决赛在转型流程中顺利通过.
  3. 在SparkStreaming计算开始之前加载文件内容的某个位置是正确/优雅的过程吗?
  4. 如果我有外部类来处理一些计算(主要是为了可读性),我是否更愿意以静态方式访问这些方法或者在一个rdd.foreachPartition(....?中实例化classe ?

java architecture apache-spark spark-streaming

1
推荐指数
1
解决办法
3273
查看次数