Joe*_*Joe 6 python bigdata apache-spark apache-spark-sql pyspark
我的目标是使用 Spark DataFrame 对分类列列表进行一次性编码。例如,与get_dummies()函数在Pandas.
该数据集bureau.csv最初取自 Kaggle 竞赛Home Credit Default Risk。这是我的条目表示例,比如说entryData,它被过滤的地方只有KEY = 100001。
# primary key
KEY = 'SK_ID_CURR'
data = spark.read.csv("bureau.csv", header=True, inferSchema=True)
# sample data from bureau.csv of 1716428 rows
entryData = data.select(columnList).where(F.col(KEY) == 100001).show()
print(entryData)
Run Code Online (Sandbox Code Playgroud)
+----------+-------------+---------------+---------------+
|SK_ID_CURR|CREDIT_ACTIVE|CREDIT_CURRENCY| CREDIT_TYPE|
+----------+-------------+---------------+---------------+
| 100001| Closed| currency 1|Consumer credit|
| 100001| Closed| currency 1|Consumer credit|
| 100001| Closed| currency 1|Consumer credit|
| 100001| Closed| currency 1|Consumer credit|
| 100001| Active| currency 1|Consumer credit|
| 100001| Active| currency 1|Consumer credit|
| 100001| Active| currency 1|Consumer credit|
+----------+-------------+---------------+---------------+
Run Code Online (Sandbox Code Playgroud)
我希望columnList通过创建函数对列表进行单热编码catg_encode(entryData, columnList),
+----------+-------------+---------------+---------------+
|SK_ID_CURR|CREDIT_ACTIVE|CREDIT_CURRENCY| CREDIT_TYPE|
+----------+-------------+---------------+---------------+
| 100001| Closed| currency 1|Consumer credit|
| 100001| Closed| currency 1|Consumer credit|
| 100001| Closed| currency 1|Consumer credit|
| 100001| Closed| currency 1|Consumer credit|
| 100001| Active| currency 1|Consumer credit|
| 100001| Active| currency 1|Consumer credit|
| 100001| Active| currency 1|Consumer credit|
+----------+-------------+---------------+---------------+
Run Code Online (Sandbox Code Playgroud)
Note cols_type()是一个函数,它返回一个列列表,可以是分类列 (if obj=True) 或数字列(if obj=False)。
我已经成功地对第一列进行了单热编码,'CREDIT_ACTIVE'但我不能同时对孔列进行编码,我的意思是构建函数catg_encode。
columnList = cols_type(entryData, obj=True)[1:]
print(columnList)
['CREDIT_ACTIVE', 'CREDIT_CURRENCY', 'CREDIT_TYPE']
Run Code Online (Sandbox Code Playgroud)
+----------+--------+----+------+------+
|SK_ID_CURR|Bad debt|Sold|Active|Closed|
+----------+--------+----+------+------+
| 100001| 0| 0| 0| 1|
| 100001| 0| 0| 0| 1|
| 100001| 0| 0| 0| 1|
| 100001| 0| 0| 0| 1|
| 100001| 0| 0| 1| 0|
| 100001| 0| 0| 1| 0|
| 100001| 0| 0| 1| 0|
+----------+--------+----+------+------+
Run Code Online (Sandbox Code Playgroud)
这里的特征'CREDIT_ACTIVE'有 4 个不同的类别;['Bad debt', 'Sold', 'Active', 'Closed'].
注意我什至尝试过IndexToString,OneHotEncoderEstimator但对这个特定任务没有帮助。
我期待有以下输出,
+----------+--------+----+------+------+----------+----------+----------+----------+----------+---
|SK_ID_CURR|Bad debt|Sold|Active|Closed|currency 1|currency 2|currency 3|currency 4|..........|...
+----------+--------+----+------+------+----------+----------+----------+----------+----------+---
| 100001| 0| 0| 0| 1| 1| 0| 0| 0| ..|
| 100001| 0| 0| 0| 1| 1| 0| 0| 0| ..|
| 100001| 0| 0| 0| 1| 1| 0| 0| 0| ..|
| 100001| 0| 0| 0| 1| 1| 0| 0| 0| ..|
| 100001| 0| 0| 1| 0| 1| 0| 0| 0| ..|
| 100001| 0| 0| 1| 0| 1| 0| 0| 0| ..|
| 100001| 0| 0| 1| 0| 1| 0| 0| 0| ..|
+----------+--------+----+------+------+----------+----------+----------+----------+----------+---
Run Code Online (Sandbox Code Playgroud)
连续点...用于特征的其余类别,'CREDIT_TYPE'它们是
['Loan for the purchase of equipment', 'Cash loan (non-earmarked)', 'Microloan', 'Consumer credit', 'Mobile operator loan', 'Another type of loan', 'Mortgage', 'Interbank credit', 'Loan for working capital replenishment', 'Car loan', 'Real estate loan', 'Unknown type of loan', 'Loan for business development', 'Credit card', 'Loan for purchase of shares (margin lending)'].
Remarque:我在 pyspark 中看到了这篇文章E-num / get Dummies 但没有自动化许多列的过程,大数据的情况。该帖子提供了为每个分类特征编写单独代码的解决方案,这不是我的案例问题。
有两种方法可以给这种特殊的柠檬榨汁。让我们来看看它们。
import pyspark.sql.functions as f
df1 = spark._sc.parallelize([
[100001, 'Closed', 'currency 1', 'Consumer credit'],
[100001, 'Closed', 'currency 1', 'Consumer credit'],
[100001, 'Closed', 'currency 1', 'Consumer credit'],
[100001, 'Closed', 'currency 1', 'Consumer credit'],
[100001, 'Active', 'currency 1', 'Consumer credit'],
[100001, 'Active', 'currency 1', 'Consumer credit'],
[100001, 'Active', 'currency 1', 'Consumer credit'],
[100002, 'Active', 'currency 2', 'Consumer credit'],
]).toDF(['SK_ID_CURR', 'CREDIT_ACTIVE', 'CREDIT_CURRENCY', 'CREDIT_TYPE'])
# this can be done dynamically, but I don't have all categories
categories = ['Active', 'Closed', 'Bad debt', 'Sold']
# we need to pivot without aggregation, so I need to add an `id` column and group by it as well
credit_groups = (
df1.withColumn('id', f.monotonically_increasing_id())
.groupBy('SK_ID_CURR', 'id')
.pivot('CREDIT_ACTIVE', values=categories)
.agg(f.lit(1))
.drop('id')
)
# currency groups are just a 1 for each currency and ID, as per the example data
# if this is not the case, something more clever needs to be here
currency_groups = df1.groupBy('SK_ID_CURR').pivot('CREDIT_CURRENCY').agg(f.lit(1))
# join the two pivoted tables on the ID and fill nulls to zeroes
credit_groups.join(currency_groups, on=['SK_ID_CURR'], how='inner').na.fill(0).show()
+----------+------+------+--------+----+----------+----------+
|SK_ID_CURR|Active|Closed|Bad debt|Sold|currency 1|currency 2|
+----------+------+------+--------+----+----------+----------+
| 100002| 1| 0| 0| 0| 0| 1|
| 100001| 0| 1| 0| 0| 1| 0|
| 100001| 1| 0| 0| 0| 1| 0|
| 100001| 1| 0| 0| 0| 1| 0|
| 100001| 0| 1| 0| 0| 1| 0|
| 100001| 0| 1| 0| 0| 1| 0|
| 100001| 1| 0| 0| 0| 1| 0|
| 100001| 0| 1| 0| 0| 1| 0|
+----------+------+------+--------+----+----------+----------+
Run Code Online (Sandbox Code Playgroud)
StringIndexer如下OneHotEncoderEstimator:from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_NUMERIC").fit(df1) for column in ['CREDIT_ACTIVE', 'CREDIT_CURRENCY']]
pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df1).transform(df1)
df_indexed.show()
+----------+-------------+---------------+---------------+---------------------+-----------------------+
|SK_ID_CURR|CREDIT_ACTIVE|CREDIT_CURRENCY| CREDIT_TYPE|CREDIT_ACTIVE_NUMERIC|CREDIT_CURRENCY_NUMERIC|
+----------+-------------+---------------+---------------+---------------------+-----------------------+
| 100001| Closed| currency 1|Consumer credit| 0.0| 0.0|
| 100001| Closed| currency 1|Consumer credit| 0.0| 0.0|
| 100001| Closed| currency 1|Consumer credit| 0.0| 0.0|
| 100001| Closed| currency 1|Consumer credit| 0.0| 0.0|
| 100001| Active| currency 1|Consumer credit| 1.0| 0.0|
| 100001| Active| currency 1|Consumer credit| 1.0| 0.0|
| 100001| Active| currency 1|Consumer credit| 1.0| 0.0|
| 100002| Active| currency 2|Consumer credit| 1.0| 1.0|
+----------+-------------+---------------+---------------+---------------------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
从现在开始,您可以在新创建的数字列上使用 one-hot 编码。我个人推荐路线1,因为它更具可读性。然而,路线 2 也允许您链接OneHotEncoderEstimator到声明中Pipeline,使代码在声明后的一行中可执行。希望这可以帮助。
| 归档时间: |
|
| 查看次数: |
2266 次 |
| 最近记录: |