我有一个如下所示的架构.我如何解析嵌套对象
root
|-- apps: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- appName: string (nullable = true)
| | |-- appPackage: string (nullable = true)
| | |-- Ratings: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- date: string (nullable = true)
| | | | |-- rating: long (nullable = true)
|-- id: string (nullable = true)
Run Code Online (Sandbox Code Playgroud) 我们怎样才能在PySpark中获得十大推荐产品.我知道有一些方法,例如recommendedProducts为单个用户推荐产品,而且预测全部用于预测{user,item}对的评级.但是有没有一种有效的方法可以为所有用户输出每个用户的前10项?
我有一个简单的文件过滤器,基本上选择特定日期的文件.在Hadoop中,我会将PathFilter类设置为InputFormat参数setInputPathFilter.我怎样才能在Spark中执行此操作?
public class FilesFilter extends Configured implements PathFilter {
@Override
public boolean accept(Path path) {
try {
if (fs.isDirectory(path))
return true;
} catch (IOException e1) {
e1.printStackTrace();
return false;
}
String file_date = "01.30.2015";
SimpleDateFormat sdf = new SimpleDateFormat("MM.dd.yyyy");
Date date = null;
try {
date = sdf.parse(file_date);
} catch (ParseException e1) {
e1.printStackTrace();
}
long dt = date.getTime()/(1000 * 3600 * 24);
try {
FileStatus file = fs.getFileStatus(path);
long time = file.getModificationTime() / …Run Code Online (Sandbox Code Playgroud) 我有一份处理大量数据的工作.此作业经常运行没有任何错误,但偶尔会抛出此错误.我正在使用Kyro Serializer.
我正在使用纱线集群运行Spark 1.2.0.
完整的堆栈跟踪:
org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)
at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92)
at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58) …Run Code Online (Sandbox Code Playgroud) 我有成千上万个压缩文件,每个大小为2GB,位于HDFS中.我正在使用spark来处理这些文件.我使用Spark textFile()方法从HDFS加载文件.我的问题是如何重新分区数据,以便我可以并行处理每个文件.目前,每个.gz文件都在一个任务中处理.因此,如果我处理1000个文件,则只执行1000个任务.据我所知,压缩文件不可拆分.但是,还有其他方法可以用来更快地完成我的工作吗?
纱线资源管理器未显示spark应用程序的总核心数.让我们说如果提交一个300个执行程序和执行程序核心为3的火花作业.那么火花作业所用的核心总数是900,但在纱线资源管理器中它只显示为300.
那么这只是一个显示错误还是Yarn没有看到600个内核的其余部分?
环境:HDP2.2调度程序:容量调度程序Spark:1.4.1