我是Spark的新手.我有一个大的元素数据集[RDD],我想把它分成两个完全相同大小的分区,维护元素的顺序.我试着用RangePartitioner像
var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))
Run Code Online (Sandbox Code Playgroud)
这不能给出令人满意的结果,因为它大致分割但不完全相同的大小维持元素的顺序.例如,如果有64个元素,我们使用
Rangepartitioner,然后它分为31个元素和33个元素.
我需要一个分区器,以便我在一半中获得前32个元素,而另一半包含第二组32个元素.你能否通过建议如何使用自定义分区器来帮助我,这样我可以获得相同大小的两半,保持元素的顺序?
当我执行以下命令时:
scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4).partitionBy(new HashPartitioner(10)).persist()
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[10] at partitionBy at <console>:22
scala> rdd.partitions.size
res9: Int = 10
scala> rdd.partitioner.isDefined
res10: Boolean = true
scala> rdd.partitioner.get
res11: org.apache.spark.Partitioner = org.apache.spark.HashPartitioner@a
Run Code Online (Sandbox Code Playgroud)
它说有10个分区,分区完成使用HashPartitioner.但是当我执行以下命令时:
scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4)
...
scala> rdd.partitions.size
res6: Int = 4
scala> rdd.partitioner.isDefined
res8: Boolean = false
Run Code Online (Sandbox Code Playgroud)
它说有4个分区,并且没有定义分区器.那么,什么是Spark中的默认分区方案?/如何在第二种情况下对数据进行分区?
为什么Spark中的模式匹配与Scala中的模式匹配不一样?请参阅下面的示例...函数f()尝试在类上进行模式匹配,它在Scala REPL中工作但在Spark中失败并导致所有"???". f2()是一种解决方法,可以在Spark中获得所需的结果.isInstanceOf(),但我知道这是Scala中的错误形式.
任何帮助模式匹配Spark中这种情况下的正确方法将不胜感激.
abstract class a extends Serializable {val a: Int}
case class b(a: Int) extends a
case class bNull(a: Int=0) extends a
val x: List[a] = List(b(0), b(1), bNull())
val xRdd = sc.parallelize(x)
Run Code Online (Sandbox Code Playgroud)
尝试模式匹配在Scala REPL中工作但在Spark中失败
def f(x: a) = x match {
case b(n) => "b"
case bNull(n) => "bnull"
case _ => "???"
}
Run Code Online (Sandbox Code Playgroud)
在Spark中运行的解决方法,但是形式不好(我认为)
def f2(x: a) = {
if (x.isInstanceOf[b]) {
"b"
} else if (x.isInstanceOf[bNull]) {
"bnull"
} else …Run Code Online (Sandbox Code Playgroud) 我知道该partitionBy功能会分区我的数据.如果我使用rdd.partitionBy(100)它将按键将我的数据分成100个部分.即,与类似键相关联的数据将被组合在一起
我正在尝试从pyspark中的列表创建一个字典.我有以下列表清单:
rawPositions
Run Code Online (Sandbox Code Playgroud)
给
[[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3904.125, 390412.5],
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3900.75, 390075.0],
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3882.5625, 388256.25],
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3926.25, 392625.0],
[2766232,
'CDX IG CDSI S25 V1 5Y CBBT CORP',
'BC85',
'Enterprise',
30000000.0,
-16323.2439825,
30000000.0],
[2766232,
'CDX IG CDSI S25 V1 5Y CBBT CORP',
'BC85',
'Enterprise',
30000000.0,
-16928.620101900004,
30000000.0],
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 105.0, 129596.25, 12959625.0],
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 128.0, 162112.0, 16211200.0],
[1009804, …Run Code Online (Sandbox Code Playgroud) 我试图在格式的数据上调用pyspark的reduceByKey函数 (([a,b,c], 1), ([a,b,c], 1), ([a,d,b,e], 1), ...
似乎pyspark不接受数组作为普通键中的键,通过简单地应用.reduceByKey(add)来降低值.
我已经尝试过首先将数组转换为字符串,.map((x,y): (str(x),y))但这不起作用,因为将字符串后处理回数组太慢了.
有没有办法让pyspark使用数组作为键或使用另一个函数快速将字符串转换回数组?
这是相关的错误代码
File "/home/jan/Documents/spark-1.4.0/python/lib/pyspark.zip/pyspark/shuffle.py", line 268, in mergeValues
d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'list'
enter code here
Run Code Online (Sandbox Code Playgroud)
摘要:
输入:x =[([a,b,c], 1), ([a,b,c], 1), ([a,d,b,e], 1), ...]
所需的输出:y =[([a,b,c], 2), ([a,d,b,e], 1),...]
使得我可以访问a由y[0][0][0]和2由y[0][1]
我正在使用Spark 1.3.1,我很好奇为什么Spark不允许在地图侧组合上使用数组键.一块combineByKey function:
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
}
Run Code Online (Sandbox Code Playgroud) 我们正在尝试为RDD分配相同的执行程序和相同的分区程序以避免任何网络流量,并且像cogroup和join这样的shuffle操作没有任何阶段边界,并且所有转换都在一个阶段完成.
因此,为了实现这一点,我们使用Java中的自定义RDD类(ExtendRDD.class)包装RDD,该类具有来自RDD.class(在scala中)的覆盖getPreferredLocation函数,如下所示:
public Seq<String> getPreferredLocations(Partition split){
listString.add("11.113.57.142");
listString.add("11.113.57.163");
listString.add("11.113.57.150");
List<String> finalList = new ArrayList<String>();
finalList.add(listString.get(split.index() % listString.size()));
Seq<String> toReturnListString = scala.collection.JavaConversions.asScalaBuffer(finalList).toSeq();
return toReturnListString;
}
Run Code Online (Sandbox Code Playgroud)
有了这个,我们就可以控制spark的行为,看看它将RDD放在集群中的哪个节点.但现在的问题是,由于这些RDD的分区器不同,spark认为它们与shuffle有关,并且再次为这些shuffle操作创建了多个阶段.我们尝试在同一个自定义RDD中覆盖相同RDD.class的分区方法:
public Option<Partitioner> partitioner() {
Option<Partitioner> optionPartitioner = new Some<Partitioner>(this.getPartitioner());
return optionPartitioner;
}
Run Code Online (Sandbox Code Playgroud)
为了让它们处于同一阶段的火花,必须考虑这些RDD来自同一个分区.我们的分区方法似乎不起作用,因为spark为2个RDD提供了不同的分区,并为shuffle操作创建了多个阶段.
我们使用我们的自定义RDD包装scala RDD:
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
RDD<String> distFile1 = jsc.textFile("SomePath/data.txt",1);
ExtendRDD<String> extendRDD = new ExtendRDD<String>(distFile1, tag);
Run Code Online (Sandbox Code Playgroud)
我们以类似的方式创建另一个自定义RDD,并从该RDD中获取PairRDD(pairRDD2).然后我们尝试使用partitionBy函数将与extendRDD对象相同的分区器应用于PairRDDFunction对象,然后将cogroup应用于:
RDD<Tuple2<String, String>> pairRDD = extendRDD.keyBy(new KeyByImpl());
PairRDDFunctions<String, String> pair = new PairRDDFunctions<String, String>(pairRDD, tag, tag, null);
pair.partitionBy(extendRDD2.getPartitioner());
pair.cogroup(pairRDD2);
Run Code Online (Sandbox Code Playgroud)
所有这些似乎都不起作用,因为当遇到cogroup转换时,spark会产生多个阶段.
有关如何将相同的分区器应用于RDD的任何建议?
考虑的方法(Spark 2.2.1):
DataFrame.repartition(带partitionExprs: Column*参数的两个实现)DataFrameWriter.partitionBy从文档的partitionBy:
如果指定,输出奠定了类似文件系统
Hive的分区方案.例如,当我们Dataset按年和月分区时,目录布局如下所示:
- 年= 2016 /月= 01 /
- 年= 2016 /月= 02 /
由此,我推断列参数的顺序将决定目录布局; 因此它是相关的.
从文档的repartition:
返回
Dataset由给定分区表达式分区的新分区,使用spark.sql.shuffle.partitions分区数.结果Dataset是散列分区.
根据我目前的理解,repartition决定处理时的并行度DataFrame.有了这个定义,行为repartition(numPartitions: Int)很简单,但是对于参数的另外两个实现也是repartition如此partitionExprs: Column*.
所有事情都说,我的疑虑如下:
partitionBy方法一样,列输入的顺序也 …在我的 R 脚本中,我有一个SparkDataFrame包含四个不同月份数据的两列(时间、值)。由于我需要将我的函数分别应用到每个月,我想我会将repartition它分成四个分区,每个分区将保存一个月的数据。
我创建了一个名为 partition 的附加列,具有一个整数值 0 - 3,然后repartition通过此特定列调用该方法。
可悲的是,正如本主题中所描述的那样:
Spark SQL - df.repartition 和 DataFrameWriter partitionBy 之间的区别?,使用该repartition方法我们只确定所有具有相同键的数据最终会在同一个分区中,但是具有不同键的数据也可以最终在同一个分区中。
就我而言,执行下面可见的代码会创建 4 个分区,但只用数据填充其中的 2 个。
我想我应该使用该partitionBy方法,但是在 SparkR 的情况下,我不知道该怎么做。官方文档指出,此方法适用于称为WindowSpec而不是DataFrame.
我真的很感激这方面的一些帮助,因为我不知道如何将此方法合并到我的代码中。
sparkR.session(
master="local[*]", sparkConfig = list(spark.sql.shuffle.partitions="4"))
df <- as.DataFrame(inputDat) # this is a dataframe with added partition column
repartitionedDf <- repartition(df, col = df$partition)
schema <- structType(
structField("time", "timestamp"),
structField("value", "double"),
structField("partition", "string"))
processedDf <- dapply(repartitionedDf,
function(x) { data.frame(produceHourlyResults(x), …Run Code Online (Sandbox Code Playgroud) apache-spark ×10
rdd ×5
partitioning ×3
pyspark ×3
scala ×3
python ×2
case-class ×1
dataframe ×1
hadoop ×1
mapreduce ×1
python-3.x ×1
r ×1
sparkr ×1