PySpark DataFrame在使用爆炸之前将字符串的列更改为数组

Sar*_*ata 5 apache-spark-sql pyspark

我在spark DataFrame中有一列名为event_datajson格式,使用读取后from_json,得到以下模式:

root
 |-- user_id: string (nullable = true)
 |-- event_data: struct (nullable = true)
 |    |-- af_content_id: string (nullable = true)
 |    |-- af_currency: string (nullable = true)
 |    |-- af_order_id: long (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我只需要af_content_id此列。此属性可以具有不同的格式:

  • 一个字符串
  • 整数
  • Int和Str的列表。例如['ghhjj23','123546',12356]
  • 无(有时event_data不包含af_content_id

    我想使用explode函数以便为af_content_id格式为List的每个元素返回新行。但是当我应用它时,我得到一个错误:

    from pyspark.sql.functions import explode
    
    def get_content_id(column):
        return column.af_content_id
    
    df_transf_1 = df_transf_1.withColumn(
        "products_basket", 
        get_content_id(df_transf_1.event_data)
    )
    
    df_transf_1 = df_transf_1.withColumn(
        "product_id",
        explode(df_transf_1.products_basket)
    )
    
    Run Code Online (Sandbox Code Playgroud)

    products_basket由于数据类型不匹配而无法解析'explode()':函数explode的输入应为数组或映射类型,而不是StringType;

    我知道原因,这是因为字段af_content_id可能包含不同的类型,但我不知道如何解决它。pyspark.sql.functions.array()直接在列上使用是行不通的,因为它变成了array数组并且爆炸不会产生预期的结果。

    重现我坚持的步骤的示例代码:

    import pandas as pd
    
    arr = [
        ['b5ad805c-f295-4852-82fc-961a88',12732936],
        ['0FD6955D-484C-4FC8-8C3F-DA7D28',['Gklb38','123655']],
        ['0E3D17EA-BEEF-4931-8104','12909841'],
        ['CC2877D0-A15C-4C0A-AD65-762A35C1',[12645715, 12909837, 12909837]]
    ]
    
    df = pd.DataFrame(arr, columns = ['user_id','products_basket'])
    
    df = df[['user_id','products_basket']].astype(str)
    df_transf_1 = spark.createDataFrame(df)
    
    Run Code Online (Sandbox Code Playgroud)

    我正在寻找一种将products_basket转换为唯一可能的格式的方法:一种数组,以便在我应用时explode,每行包含一个id。

  • pau*_*ult 5

    如果您从以下 DataFrame 开始:

    df_transf_1.show(truncate=False)
    #+--------------------------------+------------------------------+
    #|user_id                         |products_basket               |
    #+--------------------------------+------------------------------+
    #|b5ad805c-f295-4852-82fc-961a88  |12732936                      |
    #|0FD6955D-484C-4FC8-8C3F-DA7D28  |['Gklb38', '123655']          |
    #|0E3D17EA-BEEF-4931-8104         |12909841                      |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
    #+--------------------------------+------------------------------+
    
    Run Code Online (Sandbox Code Playgroud)

    其中该products_basket列是StringType

    df.printSchema()
    #root
    # |-- user_id: string (nullable = true)
    # |-- products_basket: string (nullable = true)
    
    Run Code Online (Sandbox Code Playgroud)

    您无法调用,explode因为products_basket它不是数组或映射。

    一种解决方法是删除所有前导/尾随方括号,然后拆分字符串", "(逗号后跟空格)。这会将字符串转换为字符串数组。

    from pyspark.sql.functions import col, regexp_replace, split
    df_transf_new= df_transf_1.withColumn(
        "products_basket",
        split(regexp_replace(col("products_basket"), r"(^\[)|(\]$)|(')", ""), ", ")
    )
    
    df_transf_new.show(truncate=False)
    #+--------------------------------+------------------------------+
    #|user_id                         |products_basket               |
    #+--------------------------------+------------------------------+
    #|b5ad805c-f295-4852-82fc-961a88  |[12732936]                    |
    #|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |
    #|0E3D17EA-BEEF-4931-8104         |[12909841]                    |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|
    #+--------------------------------+------------------------------+
    
    Run Code Online (Sandbox Code Playgroud)

    正则表达式模式匹配以下任意一项:

    • (^\[):字符串开头的左方括号
    • (\]$):字符串末尾的右方括号
    • ('):任何单引号(因为你的字符串被引用)

    并将它们替换为空字符串。

    这假设您的数据在product_basket.

    之后split,新 DataFrame 的架构为:

    df_transf_new.printSchema()
    #root
    # |-- user_id: string (nullable = true)
    # |-- products_basket: array (nullable = true)
    # |    |-- element: string (containsNull = true)
    
    Run Code Online (Sandbox Code Playgroud)

    现在您可以致电explode

    from pyspark.sql.functions import explode
    df_transf_new.withColumn("product_id", explode("products_basket")).show(truncate=False)
    #+--------------------------------+------------------------------+----------+
    #|user_id                         |products_basket               |product_id|
    #+--------------------------------+------------------------------+----------+
    #|b5ad805c-f295-4852-82fc-961a88  |[12732936]                    |12732936  |
    #|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |Gklb38    |
    #|0FD6955D-484C-4FC8-8C3F-DA7D28  |[Gklb38, 123655]              |123655    |
    #|0E3D17EA-BEEF-4931-8104         |[12909841]                    |12909841  |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12645715  |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837  |
    #|CC2877D0-A15C-4C0A-AD65-762A35C1|[12645715, 12909837, 12909837]|12909837  |
    #+--------------------------------+------------------------------+----------+
    
    Run Code Online (Sandbox Code Playgroud)

    • 如果将 array() 应用于字符串,它将成为一个只有一个元素(字符串)的数组。`array()` 函数不知道应该使用逗号作为分隔符。如果调用 split,它会将字符串拆分为多个元素并返回一个数组。那有意义吗? (2认同)
    • 一个可能令人困惑的方面是 `array("products_basket", regexp_replace(r"(^\[)|(\]$)|(')", ""))` 和 `split("products_basket", regexp_replace(r)如果您调用 `show()`,"(^\[)|(\]$)|(')"、"")、" ,")` 都会以相同的方式*打印*到控制台,但是底层数据不同。后者是您想要的,而前者只是一个带有一个字符串的数组。 (2认同)
    • @SarahData也许那是因为当你只想在“”,“”(没有空格)上分割时,我在“”,“”上分割。如果没有看到您的实际数据,很难说,但似乎字符串没有被分割,因为模式不匹配。您还可以传入正则表达式模式进行分割,例如`",(\s+)?"`,这意味着逗号后跟可选的空格。 (2认同)