随机森林回归分类输入PySpark

kas*_*asa 5 string machine-learning pyspark one-hot-encoding

我一直在尝试在PySpark上做一个简单的随机森林回归模型.我对R机器学习有很好的体验.但是,对我来说,Pyspark上的ML看起来完全不同 - 特别是在处理分类变量,字符串索引和OneHotEncoding时(当只有数字变量时,我能够仅通过以下示例执行RF回归).虽然有很多可用于处理分类变量的例子,例如thisthis,但我没有成功,因为大多数都超过我的头脑(可能是因为我不熟悉Python ML).我将非常感谢任何可以帮助解决这个问题的人.

这是我的尝试:inputfile在这里

from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import Row
from pyspark.sql.functions import col, round
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema = "true").load('filename.csv')
train.cache()
train.dtypes
Run Code Online (Sandbox Code Playgroud)

输出是:

DataFrame[ID: int, Country: string, Carrier: double, TrafficType: string, ClickDate: timestamp, Device: string, Browser: string, OS: string, RefererUrl: string, UserIp: string, ConversionStatus: string, ConversionDate: string, ConversionPayOut: string, publisherId: string, subPublisherId: string, advertiserCampaignId: double, Fraud: double]
Run Code Online (Sandbox Code Playgroud)

接下来我选择我感兴趣的变量:

IMP = ["Country","Carrier","TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"]
train = train.fillna("XXX")
train = train.select([column for column in train.columns if column in IMP])
from pyspark.sql.types import DoubleType
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double"))
train.cache()
Run Code Online (Sandbox Code Playgroud)

输出是:

DataFrame[Country: string, Carrier: double, TrafficType: string, Device: string, Browser: string, OS: string, ConversionPayOut: double, Fraud: double]
Run Code Online (Sandbox Code Playgroud)

我的因变量是ConversionPayOut,以前字符串类型现在转换为double类型.

从这里开始我的困惑:根据这篇文章,我明白我必须将我的分类字符串类型变量转换为onehot编码的向量.这是我的尝试:

首先是一个StringIndexing:

`

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(junk) for column in list(set(junk.columns)-set(['Carrier','ConversionPayOut','Fraud'])) ]
pipeline = Pipeline(stages=indexers)
train_catind = pipeline.fit(train).transform(train)
train_catind.show()
Run Code Online (Sandbox Code Playgroud)

`

StringIndexing的输出:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|Country|Carrier|TrafficType| Device|       Browser|     OS|  ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|     TH|   20.0|          A|   Lava|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         7.0|
|     BR|  217.0|          A|     LG|        chrome|Android|        26.2680574|  0.0|              0.0|          2.0|          0.0|     0.0|         5.0|
|     TH|   20.0|          A|Generic|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         0.0|`


Next, I think, I have to do the OneHOtEncoding of the String Indexes:
Run Code Online (Sandbox Code Playgroud)

`

from pyspark.ml.feature import OneHotEncoder, StringIndexer
indexers_ON = [OneHotEncoder(inputCol=column, outputCol=column+"_Vec") for column in filter(lambda x: x.endswith('_index'), train_catind.columns) ]
pipeline = Pipeline(stages=indexers_ON)
train_OHE = pipeline.fit(train_catind).transform(train_catind)
train_OHE.show()
Run Code Online (Sandbox Code Playgroud)

`

在一个热门编码看起来像这样:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|Country|Carrier|TrafficType| Device|       Browser|     OS|  ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|TrafficType_index_Vec|Country_index_Vec|Browser_index_Vec| OS_index_Vec|Device_index_Vec|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|     TH|   20.0|          A|   Lava|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         7.0|        (1,[0],[1.0])|    (9,[1],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[7],[1.0])|
|     BR|  217.0|          A|     LG|        chrome|Android|        26.2680574|  0.0|              0.0|          2.0|          0.0|     0.0|         5.0|        (1,[0],[1.0])|    (9,[2],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[5],[1.0])|
|     TH|   20.0|          A|Generic|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         0.0|        (1,[0],[1.0])|    (9,[1],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[0],[1.0])|
Run Code Online (Sandbox Code Playgroud)

`

我对如何继续前进毫无头绪.事实上,我对哪个Spark Machine Learning软件包要求我们进行这种单热编码以及哪些不需要而无能为力.

如果StackOverflow社区可以澄清如何向前发展,那么对PySpark的所有新手来说真的很棒.

Pre*_*rem 1

要对预处理的数据运行随机森林,您可以继续使用以下代码。

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

#use VectorAssembler to combine all the feature columns into a single vector column
assemblerInputs = ["Carrier","Fraud","Country_index_Vec","TrafficType_index_Vec","Device_index_Vec","Browser_index_Vec","OS_index_Vec"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
pipeline = Pipeline(stages=assembler)
df = pipeline.fit(train_OHE).transform(train_OHE)
df = df.withColumn("label", train_OHE.ConversionPayOut)

#randomly split data into training and test dataset
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed = 111)

# train RandomForest model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rf_model = rf.fit(train_data)

# Make predictions on test data
predictions = rf_model.transform(test_data)
Run Code Online (Sandbox Code Playgroud)


希望这可以帮助!