相关疑难解决方法(0)

pyspark:有效地使partitionBy写入与原始表相同数量的总分区

我有一个与pyspark repartitionBy()函数相关的问题,我最初在这个问题的评论中发布了这个问题.我被要求将其作为一个单独的问题发布,所以这里是:

据我所知,df.partitionBy(COL)将每个值写入所有行COL到他们自己的文件夹,并且每个文件夹将(假设行以前通过其他键分布在所有分区上)具有与之前在文件中大致相同的文件数.整张桌子.我发现这种行为很烦人.如果我有一个包含500个分区的大表,并且我partitionBy(COL)在一些属性列上使用,我现在有100个文件夹,每个文件夹包含500个(现在非常小)文件.

我想要的是partitionBy(COL)行为,但文件大小和文件数量大致相同.

作为演示,上一个问题共享一个玩具示例,其中有一个包含10个分区的表,partitionBy(dayOfWeek)并且现在有70个文件,因为每个文件夹中有10个.我想要~10个文件,每天一个,可能需要2或3天,有更多的数据.

这可以轻松完成吗?喜欢的东西,df.write().repartition(COL).partitionBy(COL)好像它可能工作,但我担心,(在一个非常大的表,该表将被划分为多个文件夹的情况下),其首先将它结合到一些小的分区数之前做的partitionBy(COL)似乎是一个坏主意.

任何建议都非常感谢!

apache-spark pyspark

22
推荐指数
1
解决办法
5787
查看次数

Spark:重新分区与partitionBy中的列参数顺序

考虑的方法(Spark 2.2.1):

  1. DataFrame.repartition(带partitionExprs: Column*参数的两个实现)
  2. DataFrameWriter.partitionBy

注意:这个问题不会问这些方法之间的区别

文档partitionBy:

如果指定,输出奠定了类似文件系统Hive分区方案.例如,当我们Dataset按年和月分区时,目录布局如下所示:

  • 年= 2016 /月= 01 /
  • 年= 2016 /月= 02 /

由此,我推断列参数顺序将决定目录布局; 因此它是相关的.

文档repartition:

返回Dataset由给定分区表达式分区的新分区,使用spark.sql.shuffle.partitions分区数.结果Dataset散列分区.

根据我目前的理解,repartition决定处理时的并行度DataFrame.有了这个定义,行为repartition(numPartitions: Int)很简单,但是对于参数的另外两个实现也是repartition如此partitionExprs: Column*.


所有事情都说,我的疑虑如下:

  • partitionBy方法一样,输入的顺序也 …

partitioning dataframe apache-spark apache-spark-sql

6
推荐指数
1
解决办法
4808
查看次数

SparkR DataFrame 分区问题

在我的 R 脚本中,我有一个SparkDataFrame包含四个不同月份数据的两列(时间、值)。由于我需要将我的函数分别应用到每个月,我想我会将repartition它分成四个分区,每个分区将保存一个月的数据。

我创建了一个名为 partition 的附加列,具有一个整数值 0 - 3,然后repartition通过此特定列调用该方法。

可悲的是,正如本主题中所描述的那样: Spark SQL - df.repartition 和 DataFrameWriter partitionBy 之间的区别?,使用该repartition方法我们只确定所有具有相同键的数据最终会在同一个分区中,但是具有不同键的数据也可以最终在同一个分区中。

就我而言,执行下面可见的代码会创建 4 个分区,但只用数据填充其中的 2 个。

我想我应该使用该partitionBy方法,但是在 SparkR 的情况下,我不知道该怎么做。官方文档指出,此方法适用于称为WindowSpec而不是DataFrame.

我真的很感激这方面的一些帮助,因为我不知道如何将此方法合并到我的代码中。

sparkR.session(
   master="local[*]",  sparkConfig = list(spark.sql.shuffle.partitions="4"))
df <- as.DataFrame(inputDat) # this is a dataframe with added partition column
repartitionedDf <- repartition(df, col = df$partition)

schema <- structType(
  structField("time", "timestamp"), 
  structField("value", "double"), 
  structField("partition", "string"))

processedDf <- dapply(repartitionedDf, 
  function(x) { data.frame(produceHourlyResults(x), …
Run Code Online (Sandbox Code Playgroud)

r apache-spark sparkr

6
推荐指数
1
解决办法
773
查看次数

以独占方式按键火花RDD分区

我想按键对RDD进行分区,并让每个分区仅包含单个键的值。例如,如果我有100个不同的键值和I repartition(102),则RDD应该有2个空分区和100个分区,每个分区包含一个键值。

我尝试过,groupByKey(k).repartition(102)但这不能保证每个分区中键的排他性,因为我看到有些分区包含一个键的更多值和两个以上的空值。

标准API中有没有办法做到这一点?

apache-spark rdd pyspark

3
推荐指数
1
解决办法
2574
查看次数

高效的pyspark加入

我已经阅读了很多有关如何在pyspark中进行有效联接的内容。我发现实现高效联接的方法基本上是:

  • 如果可以,请使用广播加入。(我通常不能,因为数据帧太大)
  • 考虑使用非常大的群集。(我宁愿不是因为$$$)。
  • 使用相同的分区程序

最后一个是我想尝试的,但是我找不到在pyspark中实现它的方法。我试过了:

df.repartition(numberOfPartitions,['parition_col1','partition_col2'])
Run Code Online (Sandbox Code Playgroud)

但这无济于事,直到我停止它仍需要花费很长时间,因为在最后的几项工作中卡住了火花。

因此,如何在pyspark中使用相同的分区程序并加快连接速度,甚至摆脱永远需要的时间?我需要使用哪个代码?

PD:即使在stackoverflow上,我也查看了其他文章,但是我仍然看不到代码。

apache-spark pyspark

3
推荐指数
1
解决办法
1419
查看次数