pyspark 中的数据类型验证

Ana*_*nan 2 pyspark pyspark-sql

我们正在 pyspark 中构建数据摄取框架,并想知道处理数据类型异常的最佳方法是什么。基本上,我们希望有一个拒绝表来捕获所有不符合模式的数据。

stringDf = sparkSession.createDataFrame(
    [
        ("11/25/1991","1"),
        ("11/24/1991", None),
        ("11/30/1991","a")
    ], 
    ['dateAsString','intAsString']
)
Run Code Online (Sandbox Code Playgroud)

这是我stringDf的两列。

+------------+-----------+
|dateAsString|intAsString|
+------------+-----------+
|  11/25/1991|          1|
|  11/24/1991|       null|
|  11/30/1991|          a|
+------------+-----------+
Run Code Online (Sandbox Code Playgroud)

我想为数据框创建一个新列,dataTypeValidationErrors用于捕获此数据集中可能存在的所有错误。使用 pyspark 实现这一目标的最佳方法是什么?

+------------+-----------+------------------------+
|dateAsString|intAsString|dataTypeValidationErrors|
+------------+-----------+------------------------+
|  11/25/1991|          1|None                    |
|  11/24/1991|       null|None                    |
|  11/30/1991|          a|Not a valid Number      |
+------------+-----------+------------------------+
Run Code Online (Sandbox Code Playgroud)

pau*_*ult 7

您可以尝试cast将列设置为所需的数据类型。如果有不匹配或错误,null将被返回。在这些情况下,您需要验证原始值不是null,如果不是,则存在错误。

  • 使用pyspark.sql.functions.when()如果铸造列测试null和原来的价值不null
  • 如果是True,则使用字符串文字"Not a valid Number"作为列值。否则返回字符串"None"

例如:

import pyspark.sql.functions as f

stringDf.withColumn(
        "dataTypeValidationErrors",
        f.when(
            f.col("intAsString").cast("int").isNull() & f.col("intAsString").isNotNull(),
            f.lit("Not a valid Number")
        ).otherwise(f.lit("None"))
    )\
    .show()
#+------------+-----------+------------------------+
#|dateAsString|intAsString|dataTypeValidationErrors|
#+------------+-----------+------------------------+
#|  11/25/1991|          1|                    None|
#|  11/24/1991|       null|                    None|
#|  11/30/1991|          a|      Not a valid Number|
#+------------+-----------+------------------------+
Run Code Online (Sandbox Code Playgroud)

您还可以将其扩展为多列:

假设您还有一行具有无效dateAsString值:

stringDf = spark.createDataFrame(
    [
        ("11/25/1991","1"),
        ("11/24/1991", None),
        ("11/30/1991","a"),
        ("13.14.15", "b")
    ], 
    ['dateAsString','intAsString']
)
Run Code Online (Sandbox Code Playgroud)

使用字典来定义每一列的转换:

conversions = {
    'dateAsString':lambda c: f.from_unixtime(f.unix_timestamp(c,"MM/dd/yyyy")).cast("date"),
    'intAsString':lambda c: f.col(c).cast('int')
}

stringDf.withColumn(
        "dataTypeValidationErrors",
        f.concat_ws(", ",
            *[
                f.when(
                    v(k).isNull() & f.col(k).isNotNull(),
                    f.lit(k + " not valid")
                ).otherwise(f.lit(None))
                for k, v in conversions.items()
            ]
        )
    )\
    .show(truncate=False)
#+------------+-----------+---------------------------------------------+
#|dateAsString|intAsString|dataTypeValidationErrors                     |
#+------------+-----------+---------------------------------------------+
#|11/25/1991  |1          |                                             |
#|11/24/1991  |null       |                                             |
#|11/30/1991  |a          |intAsString not valid                        |
#|13.14.15    |b          |dateAsString not valid, intAsString not valid|
#+------------+-----------+---------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

或者,如果您只想知道某行是否有错误,而无需知道细节:

stringDf.withColumn(
        "dataTypeValidationErrors",
        f.when(
            reduce(
                lambda a, b: a|b,
                (v(k).isNull() & f.col(k).isNotNull() for k, v in conversions.items())
            ),
            f.lit("Validation Error")
        ).otherwise(f.lit("None"))     
    )\
    .show(truncate=False)
#+------------+-----------+------------------------+
#|dateAsString|intAsString|dataTypeValidationErrors|
#+------------+-----------+------------------------+
#|11/25/1991  |1          |None                    |
#|11/24/1991  |null       |None                    |
#|11/30/1991  |a          |Validation Error        |
#|13.14.15    |b          |Validation Error        |
#+------------+-----------+------------------------+
Run Code Online (Sandbox Code Playgroud)