标题"地点级别"和5状态数据本地 - >进程本地 - >节点本地 - >机架本地 - >任何?的含义是什么?

我们在Spark上使用Redis来缓存我们的键值对.这是代码:
import com.redis.RedisClient
val r = new RedisClient("192.168.1.101", 6379)
val perhit = perhitFile.map(x => {
val arr = x.split(" ")
val readId = arr(0).toInt
val refId = arr(1).toInt
val start = arr(2).toInt
val end = arr(3).toInt
val refStr = r.hmget("refStr", refId).get(refId).split(",")(1)
val readStr = r.hmget("readStr", readId).get(readId)
val realend = if(end > refStr.length - 1) refStr.length - 1 else end
val refOneStr = refStr.substring(start, realend)
(readStr, refOneStr, refId, start, realend, readId)
})
Run Code Online (Sandbox Code Playgroud)
但编译器给了我这样的反馈:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable …Run Code Online (Sandbox Code Playgroud) 我得到了一个名为索引的rdd:RDD [(String,String)],我想使用索引来处理我的文件。这是代码:
val get = file.map({x =>
val tmp = index.lookup(x).head
tmp
})
Run Code Online (Sandbox Code Playgroud)
问题是我不能在file.map函数中使用索引,我运行了该程序,它给了我这样的反馈:
14/12/11 16:22:27 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 602, spark2): scala.MatchError: null
org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:770)
com.ynu.App$$anonfun$12.apply(App.scala:270)
com.ynu.App$$anonfun$12.apply(App.scala:265)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
scala.collection.AbstractIterator.to(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1080)
org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1080)
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我不知道为什么 如果要实现此功能,该怎么办?谢谢
现在我有2个RDD加入但是当我输入".join"时我的IDE找不到这个符号,它告诉我"无法解析符号连接".我已经使用maven来构建我的项目并重新导入函数来处理我的依赖.我错过了一些依赖吗?有人能告诉我如何解决它吗?谢谢!
输入:
BC
BD
BC
BC
BD
CD
输出:
BC 3
BD 2
CD 1
如果我使用 char 类型作为键,它是可用的。但似乎 Thrust 不支持字符串作为键。
#include <thrust/device_vector.h>
#include <thrust/iterator/constant_iterator.h>
#include <thrust/reduce.h>
#include <string>
int main(void)
{
std::string data = "aaabbbbbcddeeeeeeeeeff";
size_t N = data.size();
thrust::device_vector<char> input(data.begin(), data.end());
thrust::device_vector<char> output(N);
thrust::device_vector<int> lengths(N);
size_t num_runs =
thrust::reduce_by_key(input.begin(), input.end(),
thrust::constant_iterator<int>(1),
output.begin(),
lengths.begin()
).first - output.begin();
return 0;
}
Run Code Online (Sandbox Code Playgroud)
如何使用 Thrust 实现它?