Mik*_*son 5 python apache-spark apache-spark-sql pyspark
使用 PySpark 的 ML 模块,经常会发生以下步骤(在数据清理之后等):
摘取一段精简的代码片段:
predictions = model.transform(test_df)
Run Code Online (Sandbox Code Playgroud)
该predictions数据帧将仅具有预测(以及概率,可能还有预测的转换)。但它不会包含原始数据集。
对我来说,如何将原始数据集(甚至转换后的数据集test_df)和预测结合起来并不明显;没有可供连接的共享列,并且对于大型数据集添加索引列似乎相当棘手。
对于大型数据集,比如我正在使用的数据集,我尝试了这里的建议:
test_df = test_df.repartition(predictions.rdd.getNumPartitions())
joined_schema = StructType(test_df.schema.fields + predictions.schema.fields)
interim_rdd = test_df.rdd.zip(predictions.rdd).map(lambda x: x[0] + x[1])
full_data = spark.createDataFrame(interim_rdd, joined_schema)
full_data.write.parquet(my_predictions_path, mode="overwrite")
Run Code Online (Sandbox Code Playgroud)
但我不喜欢这个有两个原因:
Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
我不想使用monotonically_increasing_id有时给出的建议,因为我的数据集太大而不允许这样做。
这似乎很重要:我如何在无法将预测与原始目标进行比较的情况下报告任何模型质量。其他人是怎么做到这一点的??
调用model = <your ml-algorithm>.fit(df_train)训练数据集时可以有任意数量的附加列。仅包含特征和标签的列将用于训练模型(通常称为features和label,可配置),但可以存在其他列。
当predictions = model.transform(df_test)在下一步中调用经过训练的模型时,将返回一个包含附加列prediction、probability和 的数据帧rawPrediction。
特别是原始特征列和标签列仍然是数据框的一部分。此外,作为其一部分的任何df_test列在输出中仍然可用,并且可用于标识行。
prediction = model.transform(df_test)
prediction.printSchema()
Run Code Online (Sandbox Code Playgroud)
印刷
root
|-- feature1: double (nullable = true)
|-- feature2: double (nullable = true)
|-- feature3: double (nullable = true)
|-- label: double (nullable = true)
|-- additional_data: string (nullable = true)
|-- features: vector (nullable = true)
|-- rawPrediction: vector (nullable = true)
|-- probability: vector (nullable = true)
|-- prediction: double (nullable = false)
Run Code Online (Sandbox Code Playgroud)
ifdf_test不仅包含所需的列features,还包含其他列,包括label. 通过评估label,prediction现在可以创建BinaryClassificationMetrics。
从技术上讲,调用model.transform是Dataset.withColumn 调用。
基于Spark 文档中的 ML Pipeline 示例的示例:Spark ML 工作流程通常从包含训练数据、特征和标签(=目标值)的数据帧开始。在此示例中,还存在与 ml 过程无关的附加列。
training_original = spark.createDataFrame([
(0.0, 1.1, 0.1, 1.0, 'any random value that is not used to train the model'),
(2.0, 1.0, -1.0, 0.0, 'another value'),
(2.0, 1.3, 1.0, 0.0, 'value 3'),
(0.0, 1.2, -0.5, 1.0, 'this value is also not used for training nor testing')],
["feature1", "feature2", "feature3", "label", "additional_data"])
Run Code Online (Sandbox Code Playgroud)
然后使用变压器将这些特征组合成一列。此任务最简单的转换器是VectorAssembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["feature1", "feature2", "feature3"],
outputCol="features")
training_transformed = assembler.transform(training_original)
#+--------+--------+--------+-----+--------------------+--------------+
#|feature1|feature2|feature3|label| additional_data| features|
#+--------+--------+--------+-----+--------------------+--------------+
#| 0.0| 1.1| 0.1| 1.0|any random value ...| [0.0,1.1,0.1]|
#| ...
Run Code Online (Sandbox Code Playgroud)
features现在可以使用列和在此数据帧上训练模型label。附加列存在,但将被该fit方法忽略。
lr = LogisticRegression(maxIter=10, regParam=0.01)
model = lr.fit(training_transformed)
Run Code Online (Sandbox Code Playgroud)
现在根据测试数据对该模型进行测试。准备工作与训练数据相同:
test_df = spark.createDataFrame([
(-1.0, 1.5, 1.3, 1.0, 'test value 1'),
(3.0, 2.0, -0.1, 0.0, 'another test value'),
(0.0, 2.2, -1.5, 1.0, 'this is not important')],
["feature1", "feature2", "feature3", "label", "additional_data"])
test_df_transformed = assembler.transform(test_df)
#+--------+--------+--------+-----+--------------------+--------------+
#|feature1|feature2|feature3|label| additional_data| features|
#+--------+--------+--------+-----+--------------------+--------------+
#| -1.0| 1.5| 1.3| 1.0| test value 1|[-1.0,1.5,1.3]|
#| ...
Run Code Online (Sandbox Code Playgroud)
运行 ML magic 会产生
prediction = model.transform(test_df_transformed)
#+--------+--------+--------+-----+--------------------+--------------+--------------------+--------------------+----------+
#|feature1|feature2|feature3|label| additional_data| features| rawPrediction| probability|prediction|
#+--------+--------+--------+-----+--------------------+--------------+--------------------+--------------------+----------+
#| -1.0| 1.5| 1.3| 1.0| test value 1|[-1.0,1.5,1.3]|[-6.5872014439355...|[0.00137599470692...| 1.0|
#| ...
Run Code Online (Sandbox Code Playgroud)
该数据帧现在包含原始输入数据(feature1和)、预期目标值(feature3)、转换后的特征()和模型预测的结果()。在这里,所有输入值、目标值和预测都可以在一个数据集中使用。这里是评估模型并计算模型所需指标的地方。将模型应用于新数据将给出相同的结果(但当然没有列)。additional_datalabelfeaturespredictionlabel