我有一个与pyspark repartitionBy()函数相关的问题,我最初在这个问题的评论中发布了这个问题.我被要求将其作为一个单独的问题发布,所以这里是:
据我所知,df.partitionBy(COL)将每个值写入所有行COL到他们自己的文件夹,并且每个文件夹将(假设行以前通过其他键分布在所有分区上)具有与之前在文件中大致相同的文件数.整张桌子.我发现这种行为很烦人.如果我有一个包含500个分区的大表,并且我partitionBy(COL)在一些属性列上使用,我现在有100个文件夹,每个文件夹包含500个(现在非常小)文件.
我想要的是partitionBy(COL)行为,但文件大小和文件数量大致相同.
作为演示,上一个问题共享一个玩具示例,其中有一个包含10个分区的表,partitionBy(dayOfWeek)并且现在有70个文件,因为每个文件夹中有10个.我想要~10个文件,每天一个,可能需要2或3天,有更多的数据.
这可以轻松完成吗?喜欢的东西,df.write().repartition(COL).partitionBy(COL)好像它可能工作,但我担心,(在一个非常大的表,该表将被划分为多个文件夹的情况下),其首先将它结合到一些小的分区数之前做的partitionBy(COL)似乎是一个坏主意.
任何建议都非常感谢!
考虑的方法(Spark 2.2.1):
DataFrame.repartition(带partitionExprs: Column*参数的两个实现)DataFrameWriter.partitionBy从文档的partitionBy:
如果指定,输出奠定了类似文件系统
Hive的分区方案.例如,当我们Dataset按年和月分区时,目录布局如下所示:
- 年= 2016 /月= 01 /
- 年= 2016 /月= 02 /
由此,我推断列参数的顺序将决定目录布局; 因此它是相关的.
从文档的repartition:
返回
Dataset由给定分区表达式分区的新分区,使用spark.sql.shuffle.partitions分区数.结果Dataset是散列分区.
根据我目前的理解,repartition决定处理时的并行度DataFrame.有了这个定义,行为repartition(numPartitions: Int)很简单,但是对于参数的另外两个实现也是repartition如此partitionExprs: Column*.
所有事情都说,我的疑虑如下:
partitionBy方法一样,列输入的顺序也 …在我的 R 脚本中,我有一个SparkDataFrame包含四个不同月份数据的两列(时间、值)。由于我需要将我的函数分别应用到每个月,我想我会将repartition它分成四个分区,每个分区将保存一个月的数据。
我创建了一个名为 partition 的附加列,具有一个整数值 0 - 3,然后repartition通过此特定列调用该方法。
可悲的是,正如本主题中所描述的那样:
Spark SQL - df.repartition 和 DataFrameWriter partitionBy 之间的区别?,使用该repartition方法我们只确定所有具有相同键的数据最终会在同一个分区中,但是具有不同键的数据也可以最终在同一个分区中。
就我而言,执行下面可见的代码会创建 4 个分区,但只用数据填充其中的 2 个。
我想我应该使用该partitionBy方法,但是在 SparkR 的情况下,我不知道该怎么做。官方文档指出,此方法适用于称为WindowSpec而不是DataFrame.
我真的很感激这方面的一些帮助,因为我不知道如何将此方法合并到我的代码中。
sparkR.session(
master="local[*]", sparkConfig = list(spark.sql.shuffle.partitions="4"))
df <- as.DataFrame(inputDat) # this is a dataframe with added partition column
repartitionedDf <- repartition(df, col = df$partition)
schema <- structType(
structField("time", "timestamp"),
structField("value", "double"),
structField("partition", "string"))
processedDf <- dapply(repartitionedDf,
function(x) { data.frame(produceHourlyResults(x), …Run Code Online (Sandbox Code Playgroud) 我想按键对RDD进行分区,并让每个分区仅包含单个键的值。例如,如果我有100个不同的键值和I repartition(102),则RDD应该有2个空分区和100个分区,每个分区包含一个键值。
我尝试过,groupByKey(k).repartition(102)但这不能保证每个分区中键的排他性,因为我看到有些分区包含一个键的更多值和两个以上的空值。
标准API中有没有办法做到这一点?
我已经阅读了很多有关如何在pyspark中进行有效联接的内容。我发现实现高效联接的方法基本上是:
最后一个是我想尝试的,但是我找不到在pyspark中实现它的方法。我试过了:
df.repartition(numberOfPartitions,['parition_col1','partition_col2'])
Run Code Online (Sandbox Code Playgroud)
但这无济于事,直到我停止它仍需要花费很长时间,因为在最后的几项工作中卡住了火花。
因此,如何在pyspark中使用相同的分区程序并加快连接速度,甚至摆脱永远需要的时间?我需要使用哪个代码?
PD:即使在stackoverflow上,我也查看了其他文章,但是我仍然看不到代码。