小编Ami*_*IFI的帖子

计算RDD中的行数

我正在使用带有java的spark,我有一个500万行的RDD.是否有一个sollution,允许我计算我的RDD的行数.我试过RDD.count()但是需要很多时间.我见过我可以使用这个功能fold.但我没有找到这个函数的java文档.你能告诉我如何使用它或给我看另一个解决方案来获取我的RDD的行数.

这是我的代码:

JavaPairRDD<String, String> lines = getAllCustomers(sc).cache();
JavaPairRDD<String,String> CFIDNotNull = lines.filter(notNull()).cache();
JavaPairRDD<String, Tuple2<String, String>> join =lines.join(CFIDNotNull).cache();

double count_ctid = (double)join.count(); // i want to get the count of these three RDD
double all = (double)lines.count();
double count_cfid = all - CFIDNotNull.count();
System.out.println("********** :"+count_cfid*100/all +"% and now : "+ count_ctid*100/all+"%");
Run Code Online (Sandbox Code Playgroud)

谢谢.

java apache-spark

19
推荐指数
2
解决办法
4万
查看次数

JavaSparkContext不可序列化

我正在使用带有cassandra的spark,而且我有一个JavaRDD<String>客户.对于每个客户,我想从cassandra中选择他的交互,如下所示:

avaPairRDD<String, List<InteractionByMonthAndCustomer>> a = client.mapToPair(new PairFunction<String, String, List<InteractionByMonthAndCustomer>>() {
        @Override
        public Tuple2<String, List<InteractionByMonthAndCustomer>> call(String s) throws Exception {               
            List<InteractionByMonthAndCustomer> b = javaFunctions(sc)
                    .cassandraTable(CASSANDRA_SCHEMA, "interaction_by_month_customer")
                    .where("ctid =?", s)
                    .map(new Function<CassandraRow, InteractionByMonthAndCustomer>() {
                        @Override
                        public InteractionByMonthAndCustomer call(CassandraRow cassandraRow) throws Exception {
                            return new InteractionByMonthAndCustomer(cassandraRow.getString("channel"),
                                    cassandraRow.getString("motif"),
                                    cassandraRow.getDate("start"),
                                    cassandraRow.getDate("end"),
                                    cassandraRow.getString("ctid"),
                                    cassandraRow.getString("month")
                            );
                        }
                    }).collect();
            return new Tuple2<String, List<InteractionByMonthAndCustomer>>(s, b);
        }
    });
Run Code Online (Sandbox Code Playgroud)

为此,我使用一个JavaSparkContext sc.但我得到了这个错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at …
Run Code Online (Sandbox Code Playgroud)

java serialization cassandra-2.0 apache-spark

7
推荐指数
1
解决办法
5784
查看次数

在 Apache NiFi 中获取失败原因

我想记录一些 NiFi 处理器失败的原因。但我看到它可能有多种原因。

SplitAvroJson 处理器示例:“如果 FlowFile 由于任何原因处理失败(例如,FlowFile 不是有效的 Avro),它将被路由到此关系

是否有可能获得我失败的确切原因以便将其保存在例如放置文件中?

先感谢您。

apache-nifi

6
推荐指数
1
解决办法
6573
查看次数

在Javardd排序

我用java来玩火花.我想对我的地图进行排序.事实上,我有这样的javaRDD:

JavaPairRDD<String, Integer> rebondCountURL = session_rebond_2.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() {
        @Override
        public Tuple2<String, String> call(Tuple2<String, String> stringStringTuple2) throws Exception {
            return new Tuple2<String, String>(stringStringTuple2._2, stringStringTuple2._1);
        }
    }).groupByKey().map(new PairFunction<Tuple2<String, Iterable<String>>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> call(Tuple2<String, Iterable<String>> stringIterableTuple2) throws Exception {
            Iterable<String> strings = stringIterableTuple2._2;
            List<String> b = new ArrayList<String>();
            for (String s : strings) {
                b.add(s);
            }
            return new Tuple2<String, Integer>(stringIterableTuple2._1, b.size());
        }
    });
Run Code Online (Sandbox Code Playgroud)

我想使用Sortby对此Java Rdd进行排序(以便使用Integer进行排序).

你能帮帮我吗?

先感谢您.

java apache-spark

4
推荐指数
1
解决办法
7735
查看次数

从Spark中的cassandra表中删除

我正在使用Spark和cassandra.我正在从我的表中读取一些行,以便使用PrimaryKey删除主题.这是我的代码:

val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
  select("a","b","c","d").
  where("d=?", d).cache()

lines.foreach(r => {
    val session: Session = connector.openSession
    val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
    session.execute(delete)
    session.close()
})
Run Code Online (Sandbox Code Playgroud)

但是这种方法为每一行创建一个会话,这需要很多时间.那么是否可以使用sc.CassandraTable或其他更好的方法删除我的行.

谢谢

scala cassandra-2.0 apache-spark

4
推荐指数
1
解决办法
5832
查看次数

Apache NiFi中的条件路由

我正在使用NiFi从Oracle数据库获取数据,并将一些数据放入Kafka(使用处理器PutKafka).示例:如果属性"id"包含"aaabb"

Apache NiFi有可能吗?我该怎么做?

etl apache-kafka apache-nifi

4
推荐指数
1
解决办法
3300
查看次数

按应用程序名称杀死纱线应用程序

我想创建一个 cron 来通过它的应用程序名称杀死一个纱线应用程序(Spark)。但我发现纱线应用程序 -kill 需要一个应用程序 ID。是否有解决方案可以通过应用程序名称杀死它,或者使用应用程序名称获取应用程序 ID。

谢谢

hadoop-yarn

4
推荐指数
3
解决办法
3909
查看次数

Spark Job Server:"服务器无法及时响应您的请求"

我正在使用Spark Job Server来运行Spark作业,它运行正常.但是当我尝试执行一项大工作(需要超过40秒)时,我收到此错误:

The server was not able to produce a timely response to your request.
Run Code Online (Sandbox Code Playgroud)

是否需要一些配置才能等待服务器应答?我该怎么办?

谢谢

java curl spray spark-jobserver

3
推荐指数
1
解决办法
2655
查看次数

如何在java中发送curl请求

我正在使用 java WebService 发送 curl 请求,这是我的请求:

curl -d "input = a b c d" 'localhost:8090/jobs?appName=Test40&classPath=fr.aid.cim.spark.JavaWord&sync=true&timeout=1000000000'
Run Code Online (Sandbox Code Playgroud)

我不知道我应该使用哪个库以及如何用 java 编写它。你能告诉我怎么做吗?

java curl

0
推荐指数
1
解决办法
2万
查看次数