使用自定义分区器在 Pyspark 中对数据帧进行分区

vik*_*ana 5 apache-spark-sql pyspark

寻找有关在 Pyspark 中使用自定义分区器的一些信息。我有一个包含各个国家/地区的国家/地区数据的数据框。因此,如果我对国家/地区列进行重新分区,它会将我的数据分布到 n 个分区中,并将类似的国家/地区数据保留到特定分区。当我看到 usingglom()方法时,这是在创建偏斜分区数据。

美国和中国等一些国家/地区在特定数据帧中拥有大量数据。我想重新分区我的数据帧,如果国家是美国和中国,那么它将进一步分成大约 10 个分区,其他国家的分区保持不变,如 IND、THA、AUS 等。我们可以在 Pyspark 代码中扩展分区器类吗?

我在下面的链接中读到了这个,我们可以在 scala Spark 应用程序中扩展 scala partitioner 类,并且可以修改 partitioner 类以使用自定义逻辑根据需求重新分区我们的数据。就像我所拥有的.. 请帮助在 Pyspark 中实现此解决方案.. 请参阅下面的链接按列分区但保持固定分区数的有效方法是什么?


我使用的是 Spark 版本 2.3.0.2,以下是我的 Dataframe 结构:

datadf= spark.sql("""
    SELECT    
        ID_NUMBER ,SENDER_NAME ,SENDER_ADDRESS ,REGION_CODE ,COUNTRY_CODE
    from udb.sometable
""");
Run Code Online (Sandbox Code Playgroud)

输入数据有六个国家,如数据AUSINDTHARUSCHNUSACHN并且USA有偏斜数据。

所以,如果我做repartitionCOUNTRY_CODE,两个分区中含有大量的数据,而其他人都很好。我使用glom()方法检查了这个。

newdf = datadf.repartition("COUNTRY_CODE")

from pyspark.sql import SparkSession
from pyspark.sql import  HiveContext, DataFrameWriter, DataFrame

newDF = datadf.repartitionByRange(3,"COUNTRY_CODE","USA")
Run Code Online (Sandbox Code Playgroud)

我正在尝试将我的数据重新分区为国家/地区的USA另外3 个分区,CHN并且希望将其他国家/地区的数据保留在单个分区中。

This is what I am expecting 
AUS- one partition
IND- one partition
THA- one partition
RUS- one partition
CHN- three partition
USA- three partition
Run Code Online (Sandbox Code Playgroud)

回溯(最近一次调用):文件“”,第 1 行,在文件“/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py”中,第 1182 行,在getattr “'%s' 中对象没有属性“%s””%(自我。)AttributeError的,名):‘据帧’对象有没有属性‘repartitionByRange’

yth*_*mar 8

结构化 API 中没有自定义分区器,因此为了使用自定义分区器,您需要下拉到 RDD API。简单3步如下:

  1. 将结构化 API 转换为 RDD API
dataRDD = dataDF.rdd
Run Code Online (Sandbox Code Playgroud)
  1. 在 RDD API 中应用自定义分区器
import random

# Extract key from Row object
dataRDD = dataRDD.map(lambda r: (r[0], r))

def partitioner(key):
    if key == "CHN":
        return random.randint(1, 10)
    elif key == "USA":
        return random.randint(11, 20)
    else:
        # distinctCountryDict is a dict mapping distinct countries to distinct integers
        # these distinct integers should not overlap with range(1, 20)
        return distinctCountryDict[key]

numPartitions = 100
dataRDD = dataRDD.partitionBy(numPartitions, partitioner)

# Remove key extracted previously
dataRDD = dataRDD.map(lambda r: r[1])
Run Code Online (Sandbox Code Playgroud)
  1. 将 RDD API 转换回结构化 API
dataDF = dataRDD.toDF()
Run Code Online (Sandbox Code Playgroud)

通过这种方式,您可以两全其美:结构化 API 中的 Spark 类型和优化的物理计划,以及低级 RDD API 中的自定义分区器。只有在绝对必要时我们才会降低到低级 API。


the*_*tom 6

用散列尝试这样的事情:

newDf = oldDf.repartition(N, $"col1", $"coln")
Run Code Online (Sandbox Code Playgroud)

或测距方法:

newDF = oldDF.repartitionByRange(N, $"col1", $"coln")
Run Code Online (Sandbox Code Playgroud)

目前还没有用于 DF 的自定义分区。

在你的情况下,我会去散列,但没有保证。

但是,如果您的数据有偏差,您可能需要做一些额外的工作,例如使用 2 列进行分区是最简单的方法。

例如,现有的或新的列 - 在这种情况下,该列对给定的国家/地区应用分组,例如 1 .. N,以及两个列上的分区。

对于有很多分组的国家,你会得到 N 个合成子部门;对于基数较低的其他人,只有1个这样的组号。不是太难。两个分区都可以占用 1 个以上的列。

在我看来,统一数量的分区填充需要很多努力并且不是真正可以实现的,但是像这里这样的下一个最佳方法就足够了。在一定程度上相当于自定义分区。

否则,在 DF 上使用 .withColumn 您可以使用这些规则模拟自定义分区并填充新的 DF 列,然后应用 repartitionByRange。也没有那么难。

  • repartitionByrange 函数如何工作?我们可以在 Pyspark 中使用它来重新分区数据帧吗? (2认同)