Spark ALS predictAll返回空

swe*_*ves 6 machine-learning apache-spark rdd pyspark apache-spark-mllib

我有以下Python测试代码(ALS.train其他地方定义的参数):

 r1 = (2, 1)
 r2 = (3, 1)
 test = sc.parallelize([r1, r2]) 
 model = ALS.train(ratings, rank, numIter, lmbda)
 predictions = model.predictAll(test)

 print test.take(1)     
 print predictions.count()
 print predictions
Run Code Online (Sandbox Code Playgroud)

哪个有效,因为它对预测变量和输出的计数为1:

[(2, 1)]
1
ParallelCollectionRDD[2691] at parallelize at PythonRDD.scala:423
Run Code Online (Sandbox Code Playgroud)

但是,当我尝试使用RDD我使用以下代码创建自己时,它似乎不再起作用了:

model = ALS.train(ratings, rank, numIter, lmbda)
validation_data = validation.map(lambda xs: tuple(int(x) for x in xs))
predictions = model.predictAll(validation_data)

print validation_data.take(1)
print predictions.count()
print validation_data
Run Code Online (Sandbox Code Playgroud)

哪个输出:

[(61, 3864)]
0
PythonRDD[4018] at RDD at PythonRDD.scala:43
Run Code Online (Sandbox Code Playgroud)

如您所见,predictAll传递映射后返回空RDD.进入的值都是相同的格式.我能看到的唯一明显的区别是第一个例子使用parallelize并产生一个ParallelCollectionRDD而第二个例子只使用一个产生一个的映射PythonRDD.难道predictAll只有当通过某种类型的工作RDD?如果是这样,是否可以在RDD不同类型之间进行转换?我不知道如何让这个工作.

zer*_*323 14

有两个基本条件MatrixFactorizationMode.predictAll可以返回RDD,其项目数少于输入:

  • 用户在训练集中丢失.
  • 训练集中缺少产品.

您可以轻松地重现此行为,并检查它是否与RDD的创建方式无关.首先让我们使用示例数据来构建模型:

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

def parse(s):
    x, y, z  = s.split(",")
    return Rating(int(x), int(y), float(z))

ratings = (sc.textFile("data/mllib/als/test.data")
  .map(parse)
  .union(sc.parallelize([Rating(1, 5, 4.0)])))

model = ALS.train(ratings, 10, 10)
Run Code Online (Sandbox Code Playgroud)

接下来,我们可以看到培训数据中包含哪些产品和用户:

set(ratings.map(lambda r: r.product).collect())
## {1, 2, 3, 4, 5}

set(ratings.map(lambda r: r.user).collect())
## {1, 2, 3, 4}
Run Code Online (Sandbox Code Playgroud)

现在让我们创建测试数据并检查预测:

valid_test = sc.parallelize([(2, 5), (1, 4), (3, 5)])
valid_test
## ParallelCollectionRDD[434] at parallelize at PythonRDD.scala:423

model.predictAll(valid_test).count()
## 3
Run Code Online (Sandbox Code Playgroud)

到现在为止还挺好.接下来,使用与代码中相同的逻辑映射它:

valid_test_ = valid_test.map(lambda xs: tuple(int(x) for x in xs))
valid_test_
## PythonRDD[497] at RDD at PythonRDD.scala:43

model.predictAll(valid_test_).count()
## 3
Run Code Online (Sandbox Code Playgroud)

还好.接下来让我们创建无效数据并重复实验:

invalid_test = sc.parallelize([
  (2, 6), # No product in the training data
  (6, 1)  # No user in the training data
])
invalid_test 
## ParallelCollectionRDD[500] at parallelize at PythonRDD.scala:423

model.predictAll(invalid_test).count()
## 0 

invalid_test_ = invalid_test.map(lambda xs: tuple(int(x) for x in xs))
model.predictAll(invalid_test_).count()
## 0
Run Code Online (Sandbox Code Playgroud)

正如预期的那样,没有对无效输入的预测.

最后,你可以通过使用ML模型来确认这是真的,它在Python代码的训练/预测中是完全独立的:

from pyspark.ml.recommendation import ALS as MLALS

model_ml = MLALS(rank=10, maxIter=10).fit(
    ratings.toDF(["user", "item", "rating"])
)
model_ml.transform((valid_test + invalid_test).toDF(["user", "item"])).show()

## +----+----+----------+
## |user|item|prediction|
## +----+----+----------+
## |   6|   1|       NaN|
## |   1|   4| 1.0184212|
## |   2|   5| 4.0041084|
## |   3|   5|0.40498763|
## |   2|   6|       NaN|
## +----+----+----------+
Run Code Online (Sandbox Code Playgroud)

如您所见,训练数据中没有相应的用户/项目意味着没有预测.

  • 很好的解释。使我意识到我的训练集缺少用户。 (2认同)