ver*_*nik 5 python apache-spark pyspark pyspark-sql
在PySpark中,重新分区模块具有一个可选的column参数,该参数当然将通过该键对您的数据框进行重新分区。
我的问题是-没有钥匙时,Spark如何重新分区?我无法进一步深入研究源代码,以找到通过Spark本身进行处理的地方。
def repartition(self, numPartitions, *cols):
"""
Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The
resulting DataFrame is hash partitioned.
:param numPartitions:
can be an int to specify the target number of partitions or a Column.
If it is a Column, it will be used as the first partitioning column. If not specified,
the default number of partitions is used.
.. versionchanged:: 1.6
Added optional arguments to specify the partitioning columns. Also made numPartitions
optional if partitioning columns are specified.
>>> df.repartition(10).rdd.getNumPartitions()
10
>>> data = df.union(df).repartition("age")
>>> data.show()
+---+-----+
|age| name|
+---+-----+
| 5| Bob|
| 5| Bob|
| 2|Alice|
| 2|Alice|
+---+-----+
>>> data = data.repartition(7, "age")
>>> data.show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
| 2|Alice|
| 5| Bob|
+---+-----+
>>> data.rdd.getNumPartitions()
7
"""
if isinstance(numPartitions, int):
if len(cols) == 0:
return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx)
else:
return DataFrame(
self._jdf.repartition(numPartitions, self._jcols(*cols)), self.sql_ctx)
elif isinstance(numPartitions, (basestring, Column)):
cols = (numPartitions, ) + cols
return DataFrame(self._jdf.repartition(self._jcols(*cols)), self.sql_ctx)
else:
raise TypeError("numPartitions should be an int or Column")
Run Code Online (Sandbox Code Playgroud)
例如:调用这些行完全可以,但是我不知道它实际上在做什么。它是整个行的哈希吗?也许数据框中的第一列?
df_2 = df_1\
.where(sf.col('some_column') == 1)\
.repartition(32)\
.alias('df_2')
Run Code Online (Sandbox Code Playgroud)
默认情况下,如果没有指定分区器,则分区不是基于数据的特征,而是以随机和统一的方式跨节点分布。
背后的重分区算法 df.repartition进行完整的数据洗牌,并在分区之间平均分配数据。为了减少洗牌,最好使用df.coalesce
这是如何使用https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4重新分区的一些很好的解释DataFrame
| 归档时间: |
|
| 查看次数: |
854 次 |
| 最近记录: |