Mat*_*hew 5 apache-spark pyspark apache-spark-ml
如何在Spark DataFrame中打印特定样本的决策路径?
Spark Version: '2.3.1'
Run Code Online (Sandbox Code Playgroud)
下面的代码打印整个模型的决策路径,如何打印特定样本的决策路径?例如,tagvalue ball等于2的行的决策路径
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import monotonically_increasing_id, col, row_number
from pyspark.sql.window import Window
spark = SparkSession.builder.appName('demo')\
.master('local[*]')\
.getOrCreate()
data = pd.DataFrame({
'ball': [0, 1, 2, 3],
'keep': [4, 5, 6, 7],
'hall': [8, 9, 10, 11],
'fall': [12, 13, 14, 15],
'mall': [16, 17, 18, 10],
'label': [21, 31, 41, 51]
})
df = spark.createDataFrame(data)
df = df.withColumn("mono_ID", monotonically_increasing_id())
w = Window().orderBy("mono_ID")
df = df.select(row_number().over(w).alias("tagvalue"), col("*"))
assembler = VectorAssembler(
inputCols=['ball', 'keep', 'hall', 'fall'], outputCol='features')
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='label')
pipeline = Pipeline(stages=[assembler, dtc]).fit(df)
transformed_pipeline = pipeline.transform(df)
#ml_pipeline = pipeline.stages[1]
result = transformed_pipeline.filter(transformed_pipeline.tagvalue == 2)
result.select('tagvalue', 'prediction').show()
+--------+----------+
|tagvalue|prediction|
+--------+----------+
| 2| 31.0|
+--------+----------+
Run Code Online (Sandbox Code Playgroud)
上面打印了prediction
tagvalue 2
.现在我想在算法中的决策路径导致该标签值的答案而不是整个模型.
我知道以下内容,但是打印整个模型决策路径而不是特定模型.
ml_pipeline = pipeline.stages[1]
ml_pipeline.toDebugString
Run Code Online (Sandbox Code Playgroud)
相当于scikit中存在的东西,什么是spark中的等价?
如果您在scikit learn中运行以下代码,它将打印该特定样本的决策路径,这里是一个直接从网站获取的代码段.
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier
iris = load_iris()
X = iris.data
y = iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
estimator = DecisionTreeClassifier(max_leaf_nodes=3, random_state=0)
estimator.fit(X_train, y_train)
n_nodes = estimator.tree_.node_count
children_left = estimator.tree_.children_left
children_right = estimator.tree_.children_right
feature = estimator.tree_.feature
threshold = estimator.tree_.threshold
# First let's retrieve the decision path of each sample. The decision_path
# method allows to retrieve the node indicator functions. A non zero element of
# indicator matrix at the position (i, j) indicates that the sample i goes
# through the node j.
node_indicator = estimator.decision_path(X_test)
# Similarly, we can also have the leaves ids reached by each sample.
leave_id = estimator.apply(X_test)
# Now, it's possible to get the tests that were used to predict a sample or
# a group of samples. First, let's make it for the sample.
sample_id = 0
node_index = node_indicator.indices[node_indicator.indptr[sample_id]:
node_indicator.indptr[sample_id + 1]]
print('Rules used to predict sample %s: ' % sample_id)
for node_id in node_index:
if leave_id[sample_id] != node_id:
continue
if (X_test[sample_id, feature[node_id]] <= threshold[node_id]):
threshold_sign = "<="
else:
threshold_sign = ">"
print("decision id node %s : (X_test[%s, %s] (= %s) %s %s)" %
(node_id,
sample_id,
feature[node_id],
X_test[sample_id, feature[node_id]],
threshold_sign,
threshold[node_id]))
Run Code Online (Sandbox Code Playgroud)
输出将是这样的
用于预测样本0的规则:决策id节点4:(X_test [0,-2](= 5.1)> -2.0)
我只是稍微更改了您的数据框,以便我们可以确保我们可以在解释中看到不同的功能
我将汇编程序更改为使用 feature_list,因此我们可以轻松访问
下面的后续更改:
#change1: ball goes from [0,1,2,3] ->[0,1,1,3] so we can see other features in explanations
#change2: added in multiple paths to the same prediction
#change3: added in a categorical variable
#change3: feature_list so we can re-use those indicies easily later
data = pd.DataFrame({
'ball': [0, 1, 1, 3, 1, 0, 1, 3],
'keep': [4, 5, 6, 7, 7, 4, 6, 7],
'hall': [8, 9, 10, 11, 2, 6, 10, 11],
'fall': [12, 13, 14, 15, 15, 12, 14, 15],
'mall': [16, 17, 18, 10, 10, 16, 18, 10],
'wall': ['a','a','a','a','a','a','c','e'],
'label': [21, 31, 41, 51, 51, 51, 21, 31]
})
df = spark.createDataFrame(data)
df = df.withColumn("mono_ID", monotonically_increasing_id())
w = Window().orderBy("mono_ID")
df = df.select(row_number().over(w).alias("tagvalue"), col("*"))
indexer = StringIndexer(inputCol='wall', outputCol='wallIndex')
encoder = OneHotEncoder(inputCol='wallIndex', outputCol='wallVec')
#i added this line so feature replacement later is easy because of the indices
features = ['ball','keep','wallVec','hall','fall']
assembler = VectorAssembler(
inputCols=features, outputCol='features')
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='label')
pipeline = Pipeline(stages=[indexer, encoder, assembler, dtc]).fit(df)
transformed_pipeline = pipeline.transform(df)
Run Code Online (Sandbox Code Playgroud)
下面是我发现能够使用决策树本身的方法:
#get the pipeline back out, as you've done earlier, this changed to [3] because of the categorical encoders
ml_pipeline = pipeline.stages[3]
#saves the model so we can get at the internals that the scala code keeps private
ml_pipeline.save("mymodel_test")
#read back in the model parameters
modeldf = spark.read.parquet("mymodel_test/data/*")
import networkx as nx
#select only the columns that we NEED and collect into a list
noderows = modeldf.select("id","prediction","leftChild","rightChild","split").collect()
#create a graph for the decision tree; you Could use a simpler tree structure here if you wanted instead of a 'graph'
G = nx.Graph()
#first pass to add the nodes
for rw in noderows:
if rw['leftChild'] < 0 and rw['rightChild'] < 0:
G.add_node(rw['id'], cat="Prediction", predval=rw['prediction'])
else:
G.add_node(rw['id'], cat="splitter", featureIndex=rw['split']['featureIndex'], thresh=rw['split']['leftCategoriesOrThreshold'], leftChild=rw['leftChild'], rightChild=rw['rightChild'], numCat=rw['split']['numCategories'])
#second pass to add the relationships, now with additional information
for rw in modeldf.where("leftChild > 0 and rightChild > 0").collect():
tempnode = G.nodes()[rw['id']]
G.add_edge(rw['id'], rw['leftChild'], reason="{0} less than {1}".format(features[tempnode['featureIndex']],tempnode['thresh']))
G.add_edge(rw['id'], rw['rightChild'], reason="{0} greater than {1}".format(features[tempnode['featureIndex']],tempnode['thresh']))
Run Code Online (Sandbox Code Playgroud)
现在让我们构建一个函数来处理所有这些东西
注意:这可以写得更干净
#function to parse the path based on the tagvalue and it's corresponding features
def decision_path(tag2search):
wanted_row = transformed_pipeline.where("tagvalue = "+str(tag2search)).collect()[0]
wanted_features = wanted_row['features']
start_node = G.nodes()[0]
while start_node['cat'] != 'Prediction':
#do stuff with categorical variables
if start_node['numCat'] > 0:
feature_value = wanted_features[start_node['featureIndex']:start_node['featureIndex'] + start_node['numCat']]
#this assumes that you'll name all your cat variables with the following syntax 'ball' -> 'ballVec' or 'wall' -> 'wallVec'
feature_column = features[start_node['featureIndex']]
original_column = feature_column[:-3]
valToCheck = [x[original_column] for x in transformed_pipeline.select(feature_column, original_column).distinct().collect() if np.all(x[feature_column].toArray()==feature_value)][0]
if (valToCheck == wanted_row[original_column]) :
print("'{0}' value of {1} in [{2}]; ".format(original_column, wanted_row[original_column], valToCheck))
start_node = G.nodes()[start_node['leftChild']]
else:
print("'{0}' value of {1} in [{2}]; ".format(original_column, wanted_row[original_column], valToCheck))
start_node = G.nodes()[start_node['rightChild']]
#path to do stuff with non-categorical variables
else:
feature_value = wanted_features[start_node['featureIndex']]
if feature_value > start_node['thresh'][0]:
print("'{0}' value of {1} was greater than {2}; ".format(features[start_node['featureIndex']], feature_value, start_node['thresh'][0]))
start_node = G.nodes()[start_node['rightChild']]
else:
print("'{0}' value of {1} was less than or equal to {2}; ".format(features[start_node['featureIndex']], feature_value, start_node['thresh'][0]))
start_node = G.nodes()[start_node['leftChild']]
print("leads to prediction of {0}".format(start_node['predval']))
Run Code Online (Sandbox Code Playgroud)
结果采用以下形式:
[decision_path(X) for X in range(1,8)]
'fall' value of 8.0 was greater than 6.0;
'ball' value of 0.0 was less than or equal to 1.0;
'ball' value of 0.0 was less than or equal to 0.0;
leads to prediction of 21.0
'fall' value of 9.0 was greater than 6.0;
'ball' value of 1.0 was less than or equal to 1.0;
'ball' value of 1.0 was greater than 0.0;
'keep' value of 5.0 was less than or equal to 5.0;
leads to prediction of 31.0
'fall' value of 10.0 was greater than 6.0;
'ball' value of 1.0 was less than or equal to 1.0;
'ball' value of 1.0 was greater than 0.0;
'keep' value of 6.0 was greater than 5.0;
'wall' value of a in [a];
leads to prediction of 21.0
'fall' value of 11.0 was greater than 6.0;
'ball' value of 3.0 was greater than 1.0;
'wall' value of a in [a];
leads to prediction of 31.0
'fall' value of 2.0 was less than or equal to 6.0;
leads to prediction of 51.0
'fall' value of 6.0 was less than or equal to 6.0;
leads to prediction of 51.0
'fall' value of 10.0 was greater than 6.0;
'ball' value of 1.0 was less than or equal to 1.0;
'ball' value of 1.0 was greater than 0.0;
'keep' value of 6.0 was greater than 5.0;
'wall' value of c in [c];
leads to prediction of 21.0
Run Code Online (Sandbox Code Playgroud)
笔记:
.toDebugString
因为访问树听起来更重要(并且可扩展)
归档时间: |
|
查看次数: |
995 次 |
最近记录: |