pyspark中的异常值检测

RSK*_*RSK 6 python-3.x apache-spark pyspark

我有一个 pyspark 数据框,如下所示。

+---+-------+--------+
|age|balance|duration|
+---+-------+--------+
|  2|   2143|     261|
| 44|     29|     151|
| 33|      2|      76|
| 50|   1506|      92|
| 33|      1|     198|
| 35|    231|     139|
| 28|    447|     217|
|  2|      2|     380|
| 58|    121|      50|
| 43|    693|      55|
| 41|    270|     222|
| 50|    390|     137|
| 53|      6|     517|
| 58|     71|      71|
| 57|    162|     174|
| 40|    229|     353|
| 45|     13|      98|
| 57|     52|      38|
|  3|      0|     219|
|  4|      0|      54|
+---+-------+--------+
Run Code Online (Sandbox Code Playgroud)

我的预期输出应该是这样的,

+---+-------+--------+-------+-----------+------------+
|age|balance|duration|age_out|balance_out|duration_out|
+---+-------+--------+-------+-----------+------------+
|  2|   2143|     261|      1|          1|           0|
| 44|     29|     151|      0|          0|           0|
| 33|      2|      76|      0|          0|           0|
| 50|   1506|      92|      0|          1|           0|
| 33|      1|     198|      0|          0|           0|
| 35|    231|     139|      0|          0|           0|
| 28|    447|     217|      0|          0|           0|
|  2|      2|     380|      1|          0|           0|
| 58|    121|      50|      0|          0|           0|
| 43|    693|      55|      0|          0|           0|
| 41|    270|     222|      0|          0|           0|
| 50|    390|     137|      0|          0|           0|
| 53|      6|     517|      0|          0|           1|
| 58|     71|      71|      0|          0|           0|
| 57|    162|     174|      0|          0|           0|
| 40|    229|     353|      0|          0|           0|
| 45|     13|      98|      0|          0|           0|
| 57|     52|      38|      0|          0|           0|
|  3|      0|     219|      1|          0|           0|
|  4|      0|      54|      0|          0|           0|
+---+-------+--------+-------+-----------+------------+
Run Code Online (Sandbox Code Playgroud)

在这里,我的目标是通过使用四分位间距方法来识别数据集中的异常记录,如下面的 Python 代码中所述。如果我们发现任何异常记录,那么我们需要将它们标记为 1,否则标记为 0。

我可以通过使用下面的代码使用 python 做同样的事情。

import numpy as np
def outliers_iqr(ys):
    quartile_1, quartile_3 = np.percentile(ys, [25, 75])
    iqr = quartile_3 - quartile_1
    lower_bound = quartile_1 - (iqr * 1.5)
    upper_bound = quartile_3 + (iqr * 1.5)
    ser = np.zeros(len(ys))
    pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
    ser[pos]=1
    return(ser)
Run Code Online (Sandbox Code Playgroud)

但我想在 pyspark 中做同样的事情。有人可以帮助我吗?

我的pyspark代码:

def outliers_iqr(ys):
    quartile_1, quartile_3 = np.percentile(ys, [25, 75])
    iqr = quartile_3 - quartile_1
    lower_bound = quartile_1 - (iqr * 1.5)
    upper_bound = quartile_3 + (iqr * 1.5)
    ser = np.zeros(len(ys))
    pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
    ser[pos]=1
    return(float(ser))

outliers_iqr_udf = udf(outliers_iqr, FloatType())
DF.withColumn('age_out', outliers_iqr_udf(DF.select('age').collect())).show()
Run Code Online (Sandbox Code Playgroud)

pau*_*ult 10

您可以pyspark.sql.DataFrame.approxQuantile在循环内部使用以获得每个列所需的第 25 个和第 75 个百分位值。

bounds = {
    c: dict(
        zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in df.columns
}
Run Code Online (Sandbox Code Playgroud)

传递的最后一个参数是相对错误,您可以在链接的帖子和docs 上阅读。简而言之,数字越小,您的结果就越准确,但在准确性和计算成本之间需要权衡。(这里我使用 0 来获取确切值,但您可能希望根据数据大小选择不同的值。)

一旦你有了第一个和第三个四分位数的值,你就可以iqr很容易地计算上界/下界:

for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)
#{'age': {'lower': 3.0, 'q1': 33.0, 'q3': 53.0, 'upper': 83.0},
# 'balance': {'lower': -570.0, 'q1': 6.0, 'q3': 390.0, 'upper': 966.0},
# 'duration': {'lower': -143.0, 'q1': 76.0, 'q3': 222.0, 'upper': 441.0}}
Run Code Online (Sandbox Code Playgroud)

现在pyspark.sql.functions.when在列表理解中使用以基于以下内容构建离群值列bounds

import pyspark.sql.functions as f
df.select(
    "*",
    *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in df.columns
    ]
).show()
#+---+-------+--------+-------+-----------+------------+
#|age|balance|duration|age_out|balance_out|duration_out|
#+---+-------+--------+-------+-----------+------------+
#|  2|   2143|     261|      1|          1|           0|
#| 44|     29|     151|      0|          0|           0|
#| 33|      2|      76|      0|          0|           0|
#| 50|   1506|      92|      0|          1|           0|
#| 33|      1|     198|      0|          0|           0|
#| 35|    231|     139|      0|          0|           0|
#| 28|    447|     217|      0|          0|           0|
#|  2|      2|     380|      1|          0|           0|
#| 58|    121|      50|      0|          0|           0|
#| 43|    693|      55|      0|          0|           0|
#| 41|    270|     222|      0|          0|           0|
#| 50|    390|     137|      0|          0|           0|
#| 53|      6|     517|      0|          0|           1|
#| 58|     71|      71|      0|          0|           0|
#| 57|    162|     174|      0|          0|           0|
#| 40|    229|     353|      0|          0|           0|
#| 45|     13|      98|      0|          0|           0|
#| 57|     52|      38|      0|          0|           0|
#|  3|      0|     219|      0|          0|           0|
#|  4|      0|      54|      0|          0|           0|
#+---+-------+--------+-------+-----------+------------+
Run Code Online (Sandbox Code Playgroud)

在这里我曾经between检查一个值是否不是异常值,并且这个函数是包含的(即x between a and b逻辑上等价于x >= a and x <= b)。