我正在使用带有java的spark,我有一个500万行的RDD.是否有一个sollution,允许我计算我的RDD的行数.我试过RDD.count()
但是需要很多时间.我见过我可以使用这个功能fold
.但我没有找到这个函数的java文档.你能告诉我如何使用它或给我看另一个解决方案来获取我的RDD的行数.
这是我的代码:
JavaPairRDD<String, String> lines = getAllCustomers(sc).cache();
JavaPairRDD<String,String> CFIDNotNull = lines.filter(notNull()).cache();
JavaPairRDD<String, Tuple2<String, String>> join =lines.join(CFIDNotNull).cache();
double count_ctid = (double)join.count(); // i want to get the count of these three RDD
double all = (double)lines.count();
double count_cfid = all - CFIDNotNull.count();
System.out.println("********** :"+count_cfid*100/all +"% and now : "+ count_ctid*100/all+"%");
Run Code Online (Sandbox Code Playgroud)
谢谢.
我正在使用带有cassandra的spark,而且我有一个JavaRDD<String>
客户.对于每个客户,我想从cassandra中选择他的交互,如下所示:
avaPairRDD<String, List<InteractionByMonthAndCustomer>> a = client.mapToPair(new PairFunction<String, String, List<InteractionByMonthAndCustomer>>() {
@Override
public Tuple2<String, List<InteractionByMonthAndCustomer>> call(String s) throws Exception {
List<InteractionByMonthAndCustomer> b = javaFunctions(sc)
.cassandraTable(CASSANDRA_SCHEMA, "interaction_by_month_customer")
.where("ctid =?", s)
.map(new Function<CassandraRow, InteractionByMonthAndCustomer>() {
@Override
public InteractionByMonthAndCustomer call(CassandraRow cassandraRow) throws Exception {
return new InteractionByMonthAndCustomer(cassandraRow.getString("channel"),
cassandraRow.getString("motif"),
cassandraRow.getDate("start"),
cassandraRow.getDate("end"),
cassandraRow.getString("ctid"),
cassandraRow.getString("month")
);
}
}).collect();
return new Tuple2<String, List<InteractionByMonthAndCustomer>>(s, b);
}
});
Run Code Online (Sandbox Code Playgroud)
为此,我使用一个JavaSparkContext sc
.但我得到了这个错误:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at …
Run Code Online (Sandbox Code Playgroud) 我想记录一些 NiFi 处理器失败的原因。但我看到它可能有多种原因。
SplitAvroJson 处理器示例:“如果 FlowFile 由于任何原因处理失败(例如,FlowFile 不是有效的 Avro),它将被路由到此关系”
是否有可能获得我失败的确切原因以便将其保存在例如放置文件中?
先感谢您。
我用java来玩火花.我想对我的地图进行排序.事实上,我有这样的javaRDD:
JavaPairRDD<String, Integer> rebondCountURL = session_rebond_2.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(Tuple2<String, String> stringStringTuple2) throws Exception {
return new Tuple2<String, String>(stringStringTuple2._2, stringStringTuple2._1);
}
}).groupByKey().map(new PairFunction<Tuple2<String, Iterable<String>>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Iterable<String>> stringIterableTuple2) throws Exception {
Iterable<String> strings = stringIterableTuple2._2;
List<String> b = new ArrayList<String>();
for (String s : strings) {
b.add(s);
}
return new Tuple2<String, Integer>(stringIterableTuple2._1, b.size());
}
});
Run Code Online (Sandbox Code Playgroud)
我想使用Sortby对此Java Rdd进行排序(以便使用Integer进行排序).
你能帮帮我吗?
先感谢您.
我正在使用Spark和cassandra.我正在从我的表中读取一些行,以便使用PrimaryKey删除主题.这是我的代码:
val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
select("a","b","c","d").
where("d=?", d).cache()
lines.foreach(r => {
val session: Session = connector.openSession
val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
session.execute(delete)
session.close()
})
Run Code Online (Sandbox Code Playgroud)
但是这种方法为每一行创建一个会话,这需要很多时间.那么是否可以使用sc.CassandraTable或其他更好的方法删除我的行.
谢谢
我正在使用NiFi从Oracle数据库获取数据,并将一些数据放入Kafka(使用处理器PutKafka).示例:如果属性"id"包含"aaabb"
Apache NiFi有可能吗?我该怎么做?
我想创建一个 cron 来通过它的应用程序名称杀死一个纱线应用程序(Spark)。但我发现纱线应用程序 -kill 需要一个应用程序 ID。是否有解决方案可以通过应用程序名称杀死它,或者使用应用程序名称获取应用程序 ID。
谢谢
我正在使用Spark Job Server来运行Spark作业,它运行正常.但是当我尝试执行一项大工作(需要超过40秒)时,我收到此错误:
The server was not able to produce a timely response to your request.
Run Code Online (Sandbox Code Playgroud)
是否需要一些配置才能等待服务器应答?我该怎么办?
谢谢
我正在使用 java WebService 发送 curl 请求,这是我的请求:
curl -d "input = a b c d" 'localhost:8090/jobs?appName=Test40&classPath=fr.aid.cim.spark.JavaWord&sync=true&timeout=1000000000'
Run Code Online (Sandbox Code Playgroud)
我不知道我应该使用哪个库以及如何用 java 编写它。你能告诉我怎么做吗?
java ×5
apache-spark ×4
apache-nifi ×2
curl ×2
apache-kafka ×1
etl ×1
hadoop-yarn ×1
scala ×1
spray ×1