如何将参数传递给ML Pipeline.fit方法?

Tus*_*ade 3 python apache-spark pyspark apache-spark-ml apache-spark-mllib

我正在尝试使用以下方法建立集群机制

  • Google Dataproc + Spark
  • Google Bigquery
  • 使用Spark ML KMeans + pipeline创建作业

如下:


  1. 在bigquery中创建基于用户级别的功能表示
    例:功能表的外观

    userid |x1 |x2 |x3 |x4 |x5 |x6 |x7 |x8 |x9 |x10
    00013 |0.01 | 0 |0 |0 |0 |0 |0 |0.06 |0.09 | 0.001

    1. 旋转了一个默认设置集群,现在用gcloud指令行界面来创建群集和运行工作,如图这里
    2. 使用提供的入门代码,我阅读了BQ表,将RDD转换为Dataframe并传递给KMeans模型/管道:
#!/usr/bin/python
"""BigQuery I/O PySpark example."""
import json
import pprint
import subprocess
import pyspark
import numpy as np
from pyspark.ml.clustering import KMeans
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Vectors, _convert_to_vector
from pyspark.sql.types import Row
from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
sc = pyspark.SparkContext()

# Use the Google Cloud Storage bucket for temporary BigQuery export data used by the InputFormat.
# This assumes the Google Cloud Storage connector for Hadoop is configured.

bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory ='gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
 conf = {# Input Parameters
 'mapred.bq.project.id': project,
 'mapred.bq.gcs.bucket': bucket,
 'mapred.bq.temp.gcs.path': input_directory,
 'mapred.bq.input.project.id': 'my-project',
 'mapred.bq.input.dataset.id': 'tempData',
 'mapred.bq.input.table.id': 'userFeatureInBQ'}

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
 'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
 'org.apache.hadoop.io.LongWritable',
 'com.google.gson.JsonObject',conf=conf)

# Tranform the userid-Feature table into feature_data RDD
 feature_data = (
 table_data
  .map(lambda (_, record): json.loads(record))
  .map(lambda   x:(x['x0'],x['x1'],x['x2'],x['x3'],x['x4'],
                  x['x5'],x['x6'],x['x7'],x['x8'],
                  x['x9'],x['x10'])))

# Function to convert each line in RDD into an array, return the vector
  def parseVector(values):
     array = np.array([float(v) for v in values])
     return _convert_to_vector(array)

# Convert the RDD into a row wise RDD
  data = feature_data.map(parseVector)
  row_rdd = data.map(lambda x: Row(x))

sqlContext = SQLContext(sc)

# cache the RDD to improve performance
row_rdd.cache()

# Create a Dataframe
df = sqlContext.createDataFrame(row_rdd, ["features"])

# cache the Dataframe
df.cache()
Run Code Online (Sandbox Code Playgroud)

这是我打印到控制台的Schema和head():

|-- features: vector (nullable = true)
[Row(features=DenseVector([0.01,0,0,0,0,0,0,0.06,0.09,0.001]))]
Run Code Online (Sandbox Code Playgroud)
  1. 按以下方式运行聚类KMeans算法
    • 多次运行模型
    • 使用不同的参数(即,更改#clusters和init_mode)
    • 计算误差或成本指标
    • 选择最佳的模型参数组合
    • 使用KMeans作为估算器创建管道
    • 使用paramMap传递多个参数
|-- features: vector (nullable = true)
[Row(features=DenseVector([0.01,0,0,0,0,0,0,0.06,0.09,0.001]))]
Run Code Online (Sandbox Code Playgroud)

我得到以下输出警告

7:03:24 WARN org.apache.spark.mllib.clustering.KMeans: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached. [PipelineModel_443dbf939b7bd3bf7bfc, PipelineModel_4b64bb761f4efe51da50, PipelineModel_4f858411ac19beacc1a4, PipelineModel_4f58b894f1d14d79b936, PipelineModel_4b8194f7a5e6be6eaf33, PipelineModel_4fc5b6370bff1b4d7dba, PipelineModel_43e0a196f16cfd3dae57, PipelineModel_47318a54000b6826b20e, PipelineModel_411bbe1c32db6bf0a92b, PipelineModel_421ea1364d8c4c9968c8, PipelineModel_4acf9cdbfda184b00328, PipelineModel_42d1a0c61c5e45cdb3cd, PipelineModel_4f0db3c394bcc2bb9352, PipelineModel_441697f2748328de251c, PipelineModel_4a64ae517d270a1e0d5a, PipelineModel_4372bc8db92b184c05b0]


#Define the paramMap & model
paramMap = ({'k':3,'initMode':'kmeans||'},{'k':3,'initMode':'random'},
  {'k':4,'initMode':'kmeans||'},{'k':4,'initMode':'random'},
  {'k':5,'initMode':'kmeans||'},{'k':5,'initMode':'random'},
  {'k':6,'initMode':'kmeans||'},{'k':6,'initMode':'random'},
  {'k':7,'initMode':'kmeans||'},{'k':7,'initMode':'random'},
  {'k':8,'initMode':'kmeans||'},{'k':8,'initMode':'random'},
  {'k':9,'initMode':'kmeans||'},{'k':9,'initMode':'random'},
  {'k':10,'initMode':'kmeans||'},{'k':10,'initMode':'random'})

 km = KMeans()

 # Create a Pipeline with estimator stage
 pipeline = Pipeline(stages=[km])

 # Call & fit the pipeline with the paramMap
 models = pipeline.fit(df, paramMap)`
 print models
Run Code Online (Sandbox Code Playgroud)

输出: [array([7.64676638e-07, 3.58531391e-01, 1.68879698e-03, 0.00000000e+00, 1.53477043e-02, 1.25822915e-02, 0.00000000e+00, 6.93060772e-07, 1.41766847e-03, 1.60941306e-02], array([2.36494105e-06, 1.87719732e-02, 3.73829379e-03, 0.00000000e+00, 4.20724542e-02, 2.28675684e-02, 0.00000000e+00, 5.45002249e-06, 1.17331153e-02, 1.24364600e-02])


这里是问题列表,需要帮助:

  • 我得到一个列表,其中只有2个群集中心作为所有模型的阵列,
    • 当我尝试访问管道时,似乎KMeans模型默认为k = 2?为什么会这样?
    • 最后一个循环应该访问pipelineModel和第0阶段并运行clusterCenter()方法?这是正确的方法吗?
    • 为什么会出现未缓存数据的错误?
  • 使用管道时,我找不到如何计算WSSSE或任何等效方法,例如.computeCost()(for mllib)?如何根据不同的参数比较不同的模型?
  • 我尝试以下代码来运行.computeCost方法在源代码中定义这里
    • 这违反了使用管道并行运行KMeans模型和模型选择的目的,但是我尝试了以下代码:
#Print the cluster centers:
for model in models:
    print vars(model)
    print model.stages[0].clusterCenters()
    print model.extractParamMap()
Run Code Online (Sandbox Code Playgroud)

这将在循环结束时输出以下内容:

[ 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687]

  • 每个模型计算的成本/错误是否相同?同样,无法使用正确的参数访问pipelineModel。

任何帮助/指导深表感谢!谢谢!

zer*_*323 5

您的参数定义不正确。它应该从特定参数映射到值,而不是从任意名称映射。你得到k等于2,因为你通过不使用的参数和每一个模型使用完全相同的默认参数。

让我们从示例数据开始:

import numpy as np
from pyspark.mllib.linalg import Vector

df = (sc.textFile("data/mllib/kmeans_data.txt")
  .map(lambda s: Vectors.dense(np.fromstring(s, dtype=np.float64, sep=" ")))
  .zipWithIndex()
  .toDF(["features", "id"]))
Run Code Online (Sandbox Code Playgroud)

和一个Pipeline

from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline

km = KMeans()

pipeline = Pipeline(stages=[km])
Run Code Online (Sandbox Code Playgroud)

如上所述,参数图应使用特定的参数作为键。例如:

params = [
    {km.k: 2, km.initMode: "k-means||"},
    {km.k: 3, km.initMode: "k-means||"},
    {km.k: 4, km.initMode: "k-means||"}
]

models = pipeline.fit(df, params=params)

assert [len(m.stages[0].clusterCenters()) for m in models] == [2, 3, 4]
Run Code Online (Sandbox Code Playgroud)

笔记:

  • 正确initMode的K均值|| 是k-means||不是kmeans||
  • 在管道中使用参数映射并不意味着可以并行训练模型。Spark通过数据而非参数并行化训练过程。无非就是一种方便的方法。
  • 您会收到有关未缓存数据的警告,因为K-Means的实际输入不是经过DataFrame转换的RDD。