标签: rdd

如何使用Spark查找中值和分位数

如何RDD使用分布式方法,IPython和Spark 找到整数的中位数?的RDD是约700 000元,因此过大,以收集和发现中位数.

这个问题与这个问题类似.但是,问题的答案是使用Scala,我不知道.

如何使用Apache Spark计算精确中位数?

使用Scala答案的思考,我试图在Python中编写类似的答案.

我知道我首先要排序RDD.我不知道怎么.我看到sortBy(按给定的方式对此RDD进行排序keyfunc)和sortByKey(对此进行排序RDD,假设它由(键,值)对组成.)方法.我认为两者都使用键值,而我RDD只有整数元素.

  1. 首先,我在考虑做什么myrdd.sortBy(lambda x: x)
  2. 接下来我将找到rdd(rdd.count())的长度.
  3. 最后,我想在rdd的中心找到元素或2个元素.我也需要这个方法的帮助.

编辑:

我有个主意.也许我可以索引我的RDD然后key = index和value = element.然后我可以尝试按价值排序?我不知道这是否可行,因为只有一种sortByKey方法.

python median apache-spark rdd pyspark

55
推荐指数
3
解决办法
6万
查看次数

reduceByKey:它在内部如何工作?

我是Spark和Scala的新手.我对reduceByKey函数在Spark中的工作方式感到困惑.假设我们有以下代码:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
Run Code Online (Sandbox Code Playgroud)

map函数是明确的:s是键,它指向行,data.txt而1是值.

但是,我没有得到reduceByKey如何在内部工作?"a"指向钥匙吗?或者,"a"指向"s"吗?那么什么代表a + b?它们是如何填满的?

scala apache-spark rdd

54
推荐指数
3
解决办法
3万
查看次数

解释Spark中的聚合功能

我正在寻找一些更好的解释python中通过spark提供的聚合功能.

我的例子如下(使用Spark 1.2.0版本的pyspark)

sc.parallelize([1,2,3,4]).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Run Code Online (Sandbox Code Playgroud)

输出:

(10, 4)
Run Code Online (Sandbox Code Playgroud)

我得到的预期结果(10,4)1+2+3+44个元素的总和.如果我改变传递给聚合函数初始值(1,0)(0,0) 我得到以下结果

sc.parallelize([1,2,3,4]).aggregate(
    (1, 0),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Run Code Online (Sandbox Code Playgroud)

输出:

(19, 4)
Run Code Online (Sandbox Code Playgroud)

该值增加9.如果我将其更改为(2,0),则值将转到(28,4)依此类推.

有人可以向我解释这个值的计算方法吗?我预计价值会上升1而不是9,(11,4)我预计会看到(19,4).

python lambda aggregate apache-spark rdd

50
推荐指数
3
解决办法
3万
查看次数

DataSet API和DataFrame API之间的区别

有没有人可以通过示例帮助我理解DataSet API和DataFrame API之间的区别?为什么需要在Spark中引入DataSet API?

apache-spark rdd apache-spark-sql apache-spark-dataset

49
推荐指数
0
解决办法
3万
查看次数

哪些操作保留了RDD顺序?

RDD具有一个有意义的(与存储模型强加的一些随机顺序相反),如果它被处理sortBy(),则如本回复中所解释的那样.

现在,哪些操作保留了该订单?

例如,是否保证(之后a.sortBy())

a.map(f).zip(a) === 
a.map(x => (f(x),x))
Run Code Online (Sandbox Code Playgroud)

怎么样

a.filter(f).map(g) === 
a.map(x => (x,g(x))).filter(f(_._1)).map(_._2)
Run Code Online (Sandbox Code Playgroud)

关于什么

a.filter(f).flatMap(g) === 
a.flatMap(x => g(x).map((x,_))).filter(f(_._1)).map(_._2)
Run Code Online (Sandbox Code Playgroud)

这里"平等" ===被理解为"功能等同",即,没有办法使用用户级操作来区分结果(即,没有读取日志和c).

apache-spark rdd

48
推荐指数
2
解决办法
2万
查看次数

使用sc.textFile("s3n:// ...)从S3读取Spark文件

尝试使用spark-shell读取位于S3中的文件:

scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log")
lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at <console>:12

scala> myRdd.count
java.io.IOException: No FileSystem for scheme: s3n
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    ... etc ...
Run Code Online (Sandbox Code Playgroud)

IOException异常:没有文件系统的方案:S3N与发生错误:

  • 开发机器上的Spark 1.31或1.40(没有Hadoop库)
  • Hortonworks Sandbox HDP v2.2.4(Hadoop 2.60)运行,它集成了Spark 1.2.1开箱即用
  • 使用s3://或s3n:// scheme

这个错误的原因是什么?缺少依赖,缺少配置或误用sc.textFile()

或者可能是因为这个帖子似乎暗示了影响Hadoop 2.60特有的Spark构建的错误.我将尝试Spark for Hadoop 2.40,看看这是否解决了这个问题.

hortonworks-data-platform apache-spark rdd

45
推荐指数
7
解决办法
9万
查看次数

'PipelinedRDD'对象在PySpark中没有属性'toDF'

我正在尝试加载SVM文件并将其转换为一个,DataFrame因此我可以使用PipelineSpark 的ML模块(ML).我刚刚在Ubuntu 14.04上安装了一个新的Spark 1.5.0(没有spark-env.sh配置).

my_script.py是:

from pyspark.mllib.util import MLUtils
from pyspark import SparkContext

sc = SparkContext("local", "Teste Original")
data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)

我正在使用: ./spark-submit my_script.py

我收到错误:

Traceback (most recent call last):
File "/home/fred-spark/spark-1.5.0-bin-hadoop2.6/pipeline_teste_original.py", line 34, in <module>
data = MLUtils.loadLibSVMFile(sc, "/home/fred-spark/svm_capture").toDF()
AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
Run Code Online (Sandbox Code Playgroud)

我无法理解的是,如果我跑:

data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)

直接在PySpark shell中,它的工作原理.

python apache-spark rdd apache-spark-sql pyspark

45
推荐指数
1
解决办法
4万
查看次数

如何使用spark从hbase读取

下面的代码将从hbase读取,然后将其转换为json结构并转换为schemaRDD,但问题是我using List要存储json字符串然后传递给javaRDD,对于大约100 GB的数据,master将被加载内存中的数据.从hbase加载数据然后执行操作然后转换为JavaRDD的正确方法是什么.

package hbase_reader;


import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;

import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;

import com.google.common.collect.Lists;

public class hbase_reader {

    public static void main(String[] args) throws IOException, ParseException {

        List<String> jars = Lists.newArrayList("");

        SparkConf spconf = new …
Run Code Online (Sandbox Code Playgroud)

hbase apache-spark rdd

44
推荐指数
4
解决办法
7万
查看次数

Spark为数据帧连接指定多个列条件

如何在连接两个数据帧时提供更多列条件.例如,我想运行以下内容:

val Lead_all = Leads.join(Utm_Master,  
    Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
    Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")
Run Code Online (Sandbox Code Playgroud)

我想只在这些列匹配时才加入.但是上面的语法无效,因为cols只需要一个字符串.那我怎么得到我想要的东西.

apache-spark rdd apache-spark-sql

44
推荐指数
6
解决办法
8万
查看次数

Spark:减去两个DataFrame

在Spark版本1.2.0中,可以使用subtract2 SchemRDD秒来结束与第一个不同的内容

val onlyNewData = todaySchemaRDD.subtract(yesterdaySchemaRDD)
Run Code Online (Sandbox Code Playgroud)

onlyNewData包含todaySchemRDD不存在的行yesterdaySchemaRDD.

如何DataFrames在Spark 1.3.0版本中实现这一目标?

dataframe apache-spark rdd

42
推荐指数
4
解决办法
7万
查看次数