小编Non*_*one的帖子

如何在spark sql中解析嵌套的JSON对象?

我有一个如下所示的架构.我如何解析嵌套对象

root
 |-- apps: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- appName: string (nullable = true)
 |    |    |-- appPackage: string (nullable = true)
 |    |    |-- Ratings: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |-- rating: long (nullable = true)
 |-- id: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

json apache-spark apache-spark-sql

22
推荐指数
2
解决办法
4万
查看次数

如何为所有用户推荐Spark ALS的十大产品?

我们怎样才能在PySpark中获得十大推荐产品.我知道有一些方法,例如recommendedProducts为单个用户推荐产品,而且预测全部用于预测{user,item}对的评级.但是有没有一种有效的方法可以为所有用户输出每个用户的前10项?

apache-spark pyspark

11
推荐指数
1
解决办法
4148
查看次数

如何在Apache Spark中使用PathFilter?

我有一个简单的文件过滤器,基本上选择特定日期的文件.在Hadoop中,我会将PathFilter类设置为InputFormat参数setInputPathFilter.我怎样才能在Spark中执行此操作?

public class FilesFilter extends Configured implements PathFilter {

    @Override
    public boolean accept(Path path) {

        try {
            if (fs.isDirectory(path))
                return true;
        } catch (IOException e1) {
            e1.printStackTrace();
            return false;
        }

        String file_date = "01.30.2015";
        SimpleDateFormat sdf = new SimpleDateFormat("MM.dd.yyyy");
        Date date = null;

        try {
            date = sdf.parse(file_date);
        } catch (ParseException e1) {
            e1.printStackTrace();
        }

        long dt = date.getTime()/(1000 * 3600 * 24);

        try {
            FileStatus file = fs.getFileStatus(path);
            long time = file.getModificationTime() / …
Run Code Online (Sandbox Code Playgroud)

java hadoop scala apache-spark

5
推荐指数
1
解决办法
1568
查看次数

Spark shuffle错误org.apache.spark.shuffle.FetchFailedException:FAILED_TO_UNCOMPRESS(5)

我有一份处理大量数据的工作.此作业经常运行没有任何错误,但偶尔会抛出此错误.我正在使用Kyro Serializer.

我正在使用纱线集群运行Spark 1.2.0.

完整的堆栈跟踪:

org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
    at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
    at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
    at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89)
    at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
    at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
    at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
    at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
    at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
    at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)
    at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92)
    at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58) …
Run Code Online (Sandbox Code Playgroud)

apache-spark

5
推荐指数
1
解决办法
3646
查看次数

如何在Apache Spark中重新分区压缩文件?

我有成千上万个压缩文件,每个大小为2GB,位于HDFS中.我正在使用spark来处理这些文件.我使用Spark textFile()方法从HDFS加载文件.我的问题是如何重新分区数据,以便我可以并行处理每个文件.目前,每个.gz文件都在一个任务中处理.因此,如果我处理1000个文件,则只执行1000个任务.据我所知,压缩文件不可拆分.但是,还有其他方法可以用来更快地完成我的工作吗?

hadoop apache-spark

5
推荐指数
1
解决办法
424
查看次数

Spark执行器核心未在纱线资源管理器中显示

纱线资源管理器未显示spark应用程序的总核心数.让我们说如果提交一个300个执行程序和执行程序核心为3的火花作业.那么火花作业所用的核心总数是900,但在纱线资源管理器中它只显示为300.

那么这只是一个显示错误还是Yarn没有看到600个内核的其余部分?

环境:HDP2.2调度程序:容量调度程序Spark:1.4.1

hadoop-yarn apache-spark

1
推荐指数
1
解决办法
1260
查看次数