相关疑难解决方法(0)

如何为同等大小的分区的Spark RDD定义自定义分区程序,其中每个分区具有相同数量的元素?

我是Spark的新手.我有一个大的元素数据集[RDD],我想把它分成两个完全相同大小的分区,维护元素的顺序.我试着用RangePartitioner

var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))
Run Code Online (Sandbox Code Playgroud)

这不能给出令人满意的结果,因为它大致分割但不完全相同的大小维持元素的顺序.例如,如果有64个元素,我们使用 Rangepartitioner,然后它分为31个元素和33个元素.

我需要一个分区器,以便我在一半中获得前32个元素,而另一半包含第二组32个元素.你能否通过建议如何使用自定义分区器来帮助我,这样我可以获得相同大小的两半,保持元素的顺序?

hadoop scala apache-spark

27
推荐指数
2
解决办法
4万
查看次数

Spark中的默认分区方案

当我执行以下命令时:

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4).partitionBy(new HashPartitioner(10)).persist()
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[10] at partitionBy at <console>:22

scala> rdd.partitions.size
res9: Int = 10

scala> rdd.partitioner.isDefined
res10: Boolean = true


scala> rdd.partitioner.get
res11: org.apache.spark.Partitioner = org.apache.spark.HashPartitioner@a
Run Code Online (Sandbox Code Playgroud)

它说有10个分区,分区完成使用HashPartitioner.但是当我执行以下命令时:

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4)
...
scala> rdd.partitions.size
res6: Int = 4
scala> rdd.partitioner.isDefined
res8: Boolean = false
Run Code Online (Sandbox Code Playgroud)

它说有4个分区,并且没有定义分区器.那么,什么是Spark中的默认分区方案?/如何在第二种情况下对数据进行分区?

partitioning apache-spark rdd

17
推荐指数
1
解决办法
5176
查看次数

Apache Spark中的Case类相等

为什么Spark中的模式匹配与Scala中的模式匹配不一样?请参阅下面的示例...函数f()尝试在类上进行模式匹配,它在Scala REPL中工作但在Spark中失败并导致所有"???". f2()是一种解决方法,可以在Spark中获得所需的结果.isInstanceOf(),但我知道这是Scala中的错误形式.

任何帮助模式匹配Spark中这种情况下的正确方法将不胜感激.

abstract class a extends Serializable {val a: Int}
case class b(a: Int) extends a 
case class bNull(a: Int=0) extends a 

val x: List[a] = List(b(0), b(1), bNull())
val xRdd = sc.parallelize(x)
Run Code Online (Sandbox Code Playgroud)

尝试模式匹配在Scala REPL中工作但在Spark中失败

def f(x: a) = x match {
    case b(n) => "b"
    case bNull(n) => "bnull"
    case _ => "???"
}
Run Code Online (Sandbox Code Playgroud)

在Spark中运行的解决方法,但是形式不好(我认为)

def f2(x: a) = {
    if (x.isInstanceOf[b]) {
        "b"
    } else if (x.isInstanceOf[bNull]) {
        "bnull"
    } else …
Run Code Online (Sandbox Code Playgroud)

scala pattern-matching case-class apache-spark rdd

15
推荐指数
1
解决办法
2231
查看次数

pyspark使用partitionby分区数据

我知道该partitionBy功能会分区我的数据.如果我使用rdd.partitionBy(100)它将按键将我的数据分成100个部分.即,与类似键相关联的数据将被组合在一起

  1. 我的理解是否正确?
  2. 是否建议将分区数等于可用内核数?这会使处理更有效吗?
  3. 如果我的数据不是键值格式怎么办?我还能使用这个功能吗?
  4. 假设我的数据是serial_number_of_student,student_name.在这种情况下,我可以通过student_name而不是serial_number对我的数据进行分区吗?

python partitioning apache-spark rdd pyspark

11
推荐指数
1
解决办法
2万
查看次数

什么是例外:字符串哈希的随机性应该通过pyspark中的PYTHONHASHSEED平均值来禁用?

我正在尝试从pyspark中的列表创建一个字典.我有以下列表清单:

rawPositions
Run Code Online (Sandbox Code Playgroud)

[[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3904.125, 390412.5],
 [1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3900.75, 390075.0],
 [1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3882.5625, 388256.25],
 [1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3926.25, 392625.0],
 [2766232,
  'CDX IG CDSI S25 V1 5Y CBBT CORP',
  'BC85',
  'Enterprise',
  30000000.0,
  -16323.2439825,
  30000000.0],
 [2766232,
  'CDX IG CDSI S25 V1 5Y CBBT CORP',
  'BC85',
  'Enterprise',
  30000000.0,
  -16928.620101900004,
  30000000.0],
 [1009804, 'LPM6 Comdty', 'BC29', 'Jet', 105.0, 129596.25, 12959625.0],
 [1009804, 'LPM6 Comdty', 'BC29', 'Jet', 128.0, 162112.0, 16211200.0],
 [1009804, …
Run Code Online (Sandbox Code Playgroud)

python-3.x apache-spark pyspark

9
推荐指数
2
解决办法
4051
查看次数

列表作为PySpark的reduceByKey的键

我试图在格式的数据上调用pyspark的reduceByKey函数 (([a,b,c], 1), ([a,b,c], 1), ([a,d,b,e], 1), ...

似乎pyspark不接受数组作为普通键中的键,通过简单地应用.reduceByKey(add)来降低值.

我已经尝试过首先将数组转换为字符串,.map((x,y): (str(x),y))但这不起作用,因为将字符串后处理回数组太慢了.

有没有办法让pyspark使用数组作为键或使用另一个函数快速将字符串转换回数组?

这是相关的错误代码

  File "/home/jan/Documents/spark-1.4.0/python/lib/pyspark.zip/pyspark/shuffle.py", line 268, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'list'
    enter code here
Run Code Online (Sandbox Code Playgroud)

摘要:

输入:x =[([a,b,c], 1), ([a,b,c], 1), ([a,d,b,e], 1), ...]

所需的输出:y =[([a,b,c], 2), ([a,d,b,e], 1),...] 使得我可以访问ay[0][0][0]2y[0][1]

python apache-spark rdd pyspark

7
推荐指数
1
解决办法
3876
查看次数

为什么Spark不允许映射端与数组键合并?

我正在使用Spark 1.3.1,我很好奇为什么Spark不允许在地图侧组合上使用数组键.一块combineByKey function:

if (keyClass.isArray) {
  if (mapSideCombine) {
    throw new SparkException("Cannot use map-side combining with array keys.")
  }
}
Run Code Online (Sandbox Code Playgroud)

scala mapreduce apache-spark rdd

7
推荐指数
1
解决办法
3050
查看次数

在Spark中为RDD分配相同的分区程序

我们正在尝试为RDD分配相同的执行程序和相同的分区程序以避免任何网络流量,并且像cogroup和join这样的shuffle操作没有任何阶段边界,并且所有转换都在一个阶段完成.

因此,为了实现这一点,我们使用Java中的自定义RDD类(ExtendRDD.class)包装RDD,该类具有来自RDD.class(在scala中)的覆盖getPreferredLocation函数,如下所示:

 public Seq<String> getPreferredLocations(Partition split){
        listString.add("11.113.57.142");
        listString.add("11.113.57.163");
        listString.add("11.113.57.150");
        List<String> finalList = new ArrayList<String>();
        finalList.add(listString.get(split.index() % listString.size()));               

        Seq<String> toReturnListString = scala.collection.JavaConversions.asScalaBuffer(finalList).toSeq();

        return toReturnListString;
    } 
Run Code Online (Sandbox Code Playgroud)

有了这个,我们就可以控制spark的行为,看看它将RDD放在集群中的哪个节点.但现在的问题是,由于这些RDD的分区器不同,spark认为它们与shuffle有关,并且再次为这些shuffle操作创建了多个阶段.我们尝试在同一个自定义RDD中覆盖相同RDD.class的分区方法:

public Option<Partitioner> partitioner() {
        Option<Partitioner> optionPartitioner = new Some<Partitioner>(this.getPartitioner());
        return optionPartitioner;
    } 
Run Code Online (Sandbox Code Playgroud)

为了让它们处于同一阶段的火花,必须考虑这些RDD来自同一个分区.我们的分区方法似乎不起作用,因为spark为2个RDD提供了不同的分区,并为shuffle操作创建了多个阶段.

我们使用我们的自定义RDD包装scala RDD:

ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
RDD<String> distFile1 = jsc.textFile("SomePath/data.txt",1);
ExtendRDD<String> extendRDD = new ExtendRDD<String>(distFile1, tag);
Run Code Online (Sandbox Code Playgroud)

我们以类似的方式创建另一个自定义RDD,并从该RDD中获取PairRDD(pairRDD2).然后我们尝试使用partitionBy函数将与extendRDD对象相同的分区器应用于PairRDDFunction对象,然后将cogroup应用于:

RDD<Tuple2<String, String>> pairRDD = extendRDD.keyBy(new KeyByImpl());
PairRDDFunctions<String, String> pair = new PairRDDFunctions<String, String>(pairRDD, tag, tag, null);
pair.partitionBy(extendRDD2.getPartitioner());
pair.cogroup(pairRDD2);
Run Code Online (Sandbox Code Playgroud)

所有这些似乎都不起作用,因为当遇到cogroup转换时,spark会产生多个阶段.

有关如何将相同的分区器应用于RDD的任何建议?

apache-spark

7
推荐指数
0
解决办法
430
查看次数

Spark:重新分区与partitionBy中的列参数顺序

考虑的方法(Spark 2.2.1):

  1. DataFrame.repartition(带partitionExprs: Column*参数的两个实现)
  2. DataFrameWriter.partitionBy

注意:这个问题不会问这些方法之间的区别

文档partitionBy:

如果指定,输出奠定了类似文件系统Hive分区方案.例如,当我们Dataset按年和月分区时,目录布局如下所示:

  • 年= 2016 /月= 01 /
  • 年= 2016 /月= 02 /

由此,我推断列参数顺序将决定目录布局; 因此它是相关的.

文档repartition:

返回Dataset由给定分区表达式分区的新分区,使用spark.sql.shuffle.partitions分区数.结果Dataset散列分区.

根据我目前的理解,repartition决定处理时的并行度DataFrame.有了这个定义,行为repartition(numPartitions: Int)很简单,但是对于参数的另外两个实现也是repartition如此partitionExprs: Column*.


所有事情都说,我的疑虑如下:

  • partitionBy方法一样,输入的顺序也 …

partitioning dataframe apache-spark apache-spark-sql

6
推荐指数
1
解决办法
4808
查看次数

SparkR DataFrame 分区问题

在我的 R 脚本中,我有一个SparkDataFrame包含四个不同月份数据的两列(时间、值)。由于我需要将我的函数分别应用到每个月,我想我会将repartition它分成四个分区,每个分区将保存一个月的数据。

我创建了一个名为 partition 的附加列,具有一个整数值 0 - 3,然后repartition通过此特定列调用该方法。

可悲的是,正如本主题中所描述的那样: Spark SQL - df.repartition 和 DataFrameWriter partitionBy 之间的区别?,使用该repartition方法我们只确定所有具有相同键的数据最终会在同一个分区中,但是具有不同键的数据也可以最终在同一个分区中。

就我而言,执行下面可见的代码会创建 4 个分区,但只用数据填充其中的 2 个。

我想我应该使用该partitionBy方法,但是在 SparkR 的情况下,我不知道该怎么做。官方文档指出,此方法适用于称为WindowSpec而不是DataFrame.

我真的很感激这方面的一些帮助,因为我不知道如何将此方法合并到我的代码中。

sparkR.session(
   master="local[*]",  sparkConfig = list(spark.sql.shuffle.partitions="4"))
df <- as.DataFrame(inputDat) # this is a dataframe with added partition column
repartitionedDf <- repartition(df, col = df$partition)

schema <- structType(
  structField("time", "timestamp"), 
  structField("value", "double"), 
  structField("partition", "string"))

processedDf <- dapply(repartitionedDf, 
  function(x) { data.frame(produceHourlyResults(x), …
Run Code Online (Sandbox Code Playgroud)

r apache-spark sparkr

6
推荐指数
1
解决办法
773
查看次数