Ana*_*nan 2 pyspark pyspark-sql
我们正在 pyspark 中构建数据摄取框架,并想知道处理数据类型异常的最佳方法是什么。基本上,我们希望有一个拒绝表来捕获所有不符合模式的数据。
stringDf = sparkSession.createDataFrame(
[
("11/25/1991","1"),
("11/24/1991", None),
("11/30/1991","a")
],
['dateAsString','intAsString']
)
Run Code Online (Sandbox Code Playgroud)
这是我stringDf的两列。
+------------+-----------+
|dateAsString|intAsString|
+------------+-----------+
| 11/25/1991| 1|
| 11/24/1991| null|
| 11/30/1991| a|
+------------+-----------+
Run Code Online (Sandbox Code Playgroud)
我想为数据框创建一个新列,dataTypeValidationErrors用于捕获此数据集中可能存在的所有错误。使用 pyspark 实现这一目标的最佳方法是什么?
+------------+-----------+------------------------+
|dateAsString|intAsString|dataTypeValidationErrors|
+------------+-----------+------------------------+
| 11/25/1991| 1|None |
| 11/24/1991| null|None |
| 11/30/1991| a|Not a valid Number |
+------------+-----------+------------------------+
Run Code Online (Sandbox Code Playgroud)
您可以尝试cast将列设置为所需的数据类型。如果有不匹配或错误,null将被返回。在这些情况下,您需要验证原始值不是null,如果不是,则存在错误。
pyspark.sql.functions.when()如果铸造列测试null和原来的价值不null。True,则使用字符串文字"Not a valid Number"作为列值。否则返回字符串"None"。例如:
import pyspark.sql.functions as f
stringDf.withColumn(
"dataTypeValidationErrors",
f.when(
f.col("intAsString").cast("int").isNull() & f.col("intAsString").isNotNull(),
f.lit("Not a valid Number")
).otherwise(f.lit("None"))
)\
.show()
#+------------+-----------+------------------------+
#|dateAsString|intAsString|dataTypeValidationErrors|
#+------------+-----------+------------------------+
#| 11/25/1991| 1| None|
#| 11/24/1991| null| None|
#| 11/30/1991| a| Not a valid Number|
#+------------+-----------+------------------------+
Run Code Online (Sandbox Code Playgroud)
您还可以将其扩展为多列:
假设您还有一行具有无效dateAsString值:
stringDf = spark.createDataFrame(
[
("11/25/1991","1"),
("11/24/1991", None),
("11/30/1991","a"),
("13.14.15", "b")
],
['dateAsString','intAsString']
)
Run Code Online (Sandbox Code Playgroud)
使用字典来定义每一列的转换:
conversions = {
'dateAsString':lambda c: f.from_unixtime(f.unix_timestamp(c,"MM/dd/yyyy")).cast("date"),
'intAsString':lambda c: f.col(c).cast('int')
}
stringDf.withColumn(
"dataTypeValidationErrors",
f.concat_ws(", ",
*[
f.when(
v(k).isNull() & f.col(k).isNotNull(),
f.lit(k + " not valid")
).otherwise(f.lit(None))
for k, v in conversions.items()
]
)
)\
.show(truncate=False)
#+------------+-----------+---------------------------------------------+
#|dateAsString|intAsString|dataTypeValidationErrors |
#+------------+-----------+---------------------------------------------+
#|11/25/1991 |1 | |
#|11/24/1991 |null | |
#|11/30/1991 |a |intAsString not valid |
#|13.14.15 |b |dateAsString not valid, intAsString not valid|
#+------------+-----------+---------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
或者,如果您只想知道某行是否有错误,而无需知道细节:
stringDf.withColumn(
"dataTypeValidationErrors",
f.when(
reduce(
lambda a, b: a|b,
(v(k).isNull() & f.col(k).isNotNull() for k, v in conversions.items())
),
f.lit("Validation Error")
).otherwise(f.lit("None"))
)\
.show(truncate=False)
#+------------+-----------+------------------------+
#|dateAsString|intAsString|dataTypeValidationErrors|
#+------------+-----------+------------------------+
#|11/25/1991 |1 |None |
#|11/24/1991 |null |None |
#|11/30/1991 |a |Validation Error |
#|13.14.15 |b |Validation Error |
#+------------+-----------+------------------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3122 次 |
| 最近记录: |