如何RDD使用分布式方法,IPython和Spark 找到整数的中位数?的RDD是约700 000元,因此过大,以收集和发现中位数.
这个问题与这个问题类似.但是,问题的答案是使用Scala,我不知道.
使用Scala答案的思考,我试图在Python中编写类似的答案.
我知道我首先要排序RDD.我不知道怎么.我看到sortBy(按给定的方式对此RDD进行排序keyfunc)和sortByKey(对此进行排序RDD,假设它由(键,值)对组成.)方法.我认为两者都使用键值,而我RDD只有整数元素.
myrdd.sortBy(lambda x: x)?rdd.count())的长度.编辑:
我有个主意.也许我可以索引我的RDD然后key = index和value = element.然后我可以尝试按价值排序?我不知道这是否可行,因为只有一种sortByKey方法.
我是Spark和Scala的新手.我对reduceByKey函数在Spark中的工作方式感到困惑.假设我们有以下代码:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
Run Code Online (Sandbox Code Playgroud)
map函数是明确的:s是键,它指向行,data.txt而1是值.
但是,我没有得到reduceByKey如何在内部工作?"a"指向钥匙吗?或者,"a"指向"s"吗?那么什么代表a + b?它们是如何填满的?
我正在寻找一些更好的解释python中通过spark提供的聚合功能.
我的例子如下(使用Spark 1.2.0版本的pyspark)
sc.parallelize([1,2,3,4]).aggregate(
(0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Run Code Online (Sandbox Code Playgroud)
输出:
(10, 4)
Run Code Online (Sandbox Code Playgroud)
我得到的预期结果(10,4)是1+2+3+44个元素的总和.如果我改变传递给聚合函数初始值(1,0)从(0,0) 我得到以下结果
sc.parallelize([1,2,3,4]).aggregate(
(1, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Run Code Online (Sandbox Code Playgroud)
输出:
(19, 4)
Run Code Online (Sandbox Code Playgroud)
该值增加9.如果我将其更改为(2,0),则值将转到(28,4)依此类推.
有人可以向我解释这个值的计算方法吗?我预计价值会上升1而不是9,(11,4)我预计会看到(19,4).
有没有人可以通过示例帮助我理解DataSet API和DataFrame API之间的区别?为什么需要在Spark中引入DataSet API?
RDD具有一个有意义的(与存储模型强加的一些随机顺序相反),如果它被处理sortBy(),则如本回复中所解释的那样.
现在,哪些操作保留了该订单?
例如,是否保证(之后a.sortBy())
a.map(f).zip(a) ===
a.map(x => (f(x),x))
Run Code Online (Sandbox Code Playgroud)
怎么样
a.filter(f).map(g) ===
a.map(x => (x,g(x))).filter(f(_._1)).map(_._2)
Run Code Online (Sandbox Code Playgroud)
关于什么
a.filter(f).flatMap(g) ===
a.flatMap(x => g(x).map((x,_))).filter(f(_._1)).map(_._2)
Run Code Online (Sandbox Code Playgroud)
这里"平等" ===被理解为"功能等同",即,没有办法使用用户级操作来区分结果(即,没有读取日志和c).
尝试使用spark-shell读取位于S3中的文件:
scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log")
lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at <console>:12
scala> myRdd.count
java.io.IOException: No FileSystem for scheme: s3n
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
... etc ...
Run Code Online (Sandbox Code Playgroud)
该IOException异常:没有文件系统的方案:S3N与发生错误:
这个错误的原因是什么?缺少依赖,缺少配置或误用sc.textFile()?
或者可能是因为这个帖子似乎暗示了影响Hadoop 2.60特有的Spark构建的错误.我将尝试Spark for Hadoop 2.40,看看这是否解决了这个问题.
我正在尝试加载SVM文件并将其转换为一个,DataFrame因此我可以使用PipelineSpark 的ML模块(ML).我刚刚在Ubuntu 14.04上安装了一个新的Spark 1.5.0(没有spark-env.sh配置).
我my_script.py是:
from pyspark.mllib.util import MLUtils
from pyspark import SparkContext
sc = SparkContext("local", "Teste Original")
data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)
我正在使用: ./spark-submit my_script.py
我收到错误:
Traceback (most recent call last):
File "/home/fred-spark/spark-1.5.0-bin-hadoop2.6/pipeline_teste_original.py", line 34, in <module>
data = MLUtils.loadLibSVMFile(sc, "/home/fred-spark/svm_capture").toDF()
AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
Run Code Online (Sandbox Code Playgroud)
我无法理解的是,如果我跑:
data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)
直接在PySpark shell中,它的工作原理.
下面的代码将从hbase读取,然后将其转换为json结构并转换为schemaRDD,但问题是我using List要存储json字符串然后传递给javaRDD,对于大约100 GB的数据,master将被加载内存中的数据.从hbase加载数据然后执行操作然后转换为JavaRDD的正确方法是什么.
package hbase_reader;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import com.google.common.collect.Lists;
public class hbase_reader {
public static void main(String[] args) throws IOException, ParseException {
List<String> jars = Lists.newArrayList("");
SparkConf spconf = new …Run Code Online (Sandbox Code Playgroud) 如何在连接两个数据帧时提供更多列条件.例如,我想运行以下内容:
val Lead_all = Leads.join(Utm_Master,
Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")
Run Code Online (Sandbox Code Playgroud)
我想只在这些列匹配时才加入.但是上面的语法无效,因为cols只需要一个字符串.那我怎么得到我想要的东西.
在Spark版本1.2.0中,可以使用subtract2 SchemRDD秒来结束与第一个不同的内容
val onlyNewData = todaySchemaRDD.subtract(yesterdaySchemaRDD)
Run Code Online (Sandbox Code Playgroud)
onlyNewData包含todaySchemRDD不存在的行yesterdaySchemaRDD.
如何DataFrames在Spark 1.3.0版本中实现这一目标?