我试图使用Spark MLib ALS与隐式反馈进行协同过滤.输入数据只有两个字段userId和productId.我没有产品评级,只是关于用户购买的产品的信息,这就是全部.所以为了训练ALS,我使用:
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int): MatrixFactorizationModel
Run Code Online (Sandbox Code Playgroud)
(http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.mllib.recommendation.ALS $)
此API需要Rating对象:
Rating(user: Int, product: Int, rating: Double)
Run Code Online (Sandbox Code Playgroud)
另一方面,文档trainImplicit说明:给出用户对某些产品给出的"隐含偏好"评级的RDD,以(用户ID,产品ID,偏好)对的形式训练矩阵分解模型.
当我将评级/偏好设置1为:
val ratings = sc.textFile(new File(dir, file).toString).map { line =>
val fields = line.split(",")
// format: (randomNumber, Rating(userId, productId, rating))
(rnd.nextInt(100), Rating(fields(0).toInt, fields(1).toInt, 1.0))
}
val training = ratings.filter(x => x._1 < 60)
.values
.repartition(numPartitions)
.cache()
val validation = ratings.filter(x => x._1 >= …Run Code Online (Sandbox Code Playgroud) 可以在RDD中压缩列表吗?例如转换:
val xxx: org.apache.spark.rdd.RDD[List[Foo]]
Run Code Online (Sandbox Code Playgroud)
至:
val yyy: org.apache.spark.rdd.RDD[Foo]
Run Code Online (Sandbox Code Playgroud)
这该怎么做?
我有一组记录,我需要:
1)按'日期','城市'和'亲切'分组
2)按奖项对每组进行排序
在我的代码中:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Sort {
case class Record(name:String, day: String, kind: String, city: String, prize:Int)
val recs = Array (
Record("n1", "d1", "k1", "c1", 10),
Record("n1", "d1", "k1", "c1", 9),
Record("n1", "d1", "k1", "c1", 8),
Record("n2", "d2", "k2", "c2", 1),
Record("n2", "d2", "k2", "c2", 2),
Record("n2", "d2", "k2", "c2", 3)
)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Test")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val rs = sc.parallelize(recs) …Run Code Online (Sandbox Code Playgroud) 需要一个使用Pandas DataFrame计算RMSE的简单示例.提供有循环true和预测值返回的函数:
def fun (data):
...
return trueVal, predVal
for data in set:
fun(data)
Run Code Online (Sandbox Code Playgroud)
然后一些代码将这些结果放在下面的数据框中,其中x是一个实数值并且p是预测值:
In [20]: d
Out[20]: {'p': [1, 10, 4, 5, 5], 'x': [1, 2, 3, 4, 5]}
In [21]: df = pd.DataFrame(d)
In [22]: df
Out[22]:
p x
0 1 1
1 10 2
2 4 3
3 5 4
4 5 5
Run Code Online (Sandbox Code Playgroud)
问题:
1)如何fun在df数据框中输入函数的结果?
2)如何使用df数据框计算RMSE ?
使用以下脚本在YARN(Hadoop 2.6.0.2.2.0.0-2041)上运行Spark 1.3.0 Pi示例时:
# Run on a YARN cluster
export HADOOP_CONF_DIR=/etc/hadoop/conf
/var/home2/test/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--executor-memory 3G \
--num-executors 50 \
/var/home2/test/spark/lib/spark-examples-1.3.0-hadoop2.4.0.jar \
1000
Run Code Online (Sandbox Code Playgroud)
它因"AM容器导致应用程序失败2次"消息而失败(请参见下文).据我所知,在此启动脚本中提供了以YARN模式运行Spark应用程序的所有必要信息.还应该配置什么才能在YARN上运行.缺什么?YARN发布失败的其他原因是什么?
[test@etl-hdp-mgmt pi]$ ./run-pi.sh
Spark assembly has been built with Hive, including Datanucleus jars on classpath
15/04/01 12:59:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/04/01 12:59:58 INFO client.RMProxy: Connecting to ResourceManager at etl-hdp-yarn.foo.bar.com/192.168.0.16:8050
15/04/01 12:59:58 INFO yarn.Client: Requesting a new application from …Run Code Online (Sandbox Code Playgroud) 有两列的表books和readers这些书籍,其中books和readers分别是图书和阅读器的ID,:
books readers
1: 1 30
2: 2 10
3: 3 20
4: 1 20
5: 1 10
6: 2 30
Run Code Online (Sandbox Code Playgroud)
记录book = 1, reader = 30表示id = 1用户已阅读该书id = 30.对于每本书对,我需要计算阅读这两本书的读者数量,使用此算法:
for each book
for each reader of the book
for each other_book in books of the reader
increment common_reader_count ((book, other_book), cnt)
Run Code Online (Sandbox Code Playgroud)
使用该算法的优点是,与将所有书籍组合计数为2相比,它需要少量操作.
为了实现上述算法,我将这些数据组织成两组:1)用书键入,包含每本书的读者的RDD和2)由读者键入的RDD,包含由每个读者读取的书籍的RDD,例如在以下程序中:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import …Run Code Online (Sandbox Code Playgroud) 在scala.MatchError (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)当我尝试访问发生异常DataFrame行元素.以下代码计算书籍对,其中一对的数量等于阅读这对书籍的读者数量.
有趣的是,只有在trainPairs创建异常时才会发生异常trainDf.join(...).如果内联创建相同的数据结构:
case class BookPair (book1:Int, book2:Int, cnt:Int, name1: String, name2: String)
val recs = Array(
BookPair(1, 2, 3, "book1", "book2"),
BookPair(2, 3, 1, "book2", "book3"),
BookPair(1, 3, 2, "book1", "book3"),
BookPair(1, 4, 5, "book1", "book4"),
BookPair(2, 4, 7, "book2", "book4")
)
Run Code Online (Sandbox Code Playgroud)
这种例外根本不会发生!
产生此异常的完整代码:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, DataFrame}
import org.apache.spark.sql.functions._
object Scratch {
case class Book(book: Int, reader: Int, name:String) …Run Code Online (Sandbox Code Playgroud) 是否有任何Spark函数允许根据某些creteria将集合拆分为多个RDD?这样的功能可以避免过度的迭代.例如:
def main(args: Array[String]) {
val logFile = "file.txt"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
}
Run Code Online (Sandbox Code Playgroud)
在这个例子中,我必须迭代'logData`两次只是为了在两个单独的文件中写入结果:
val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
Run Code Online (Sandbox Code Playgroud)
有这样的事情会很好:
val resultMap = logData.map(line => if line.contains("a") ("a", line) else if line.contains("b") ("b", line) else (" - ", line)
resultMap.writeByKey("a", "linesA.txt")
resultMap.writeByKey("b", "linesB.txt")
Run Code Online (Sandbox Code Playgroud)
这样的事吗?
我已经标记了某些组编号的矢量(LabeledPoint-s).对于每个组,我需要创建 一个单独的 Logistic回归分类器:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object Scratch {
val train = Seq(
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf …Run Code Online (Sandbox Code Playgroud) 我尝试使用Spark MLlib Logistic回归(LR)和/或随机森林(RF)分类器来创建模型来描述由基数重新组合的两个类之间的基数差异很大.
其中一组有1.5亿个负面因素,另有一组只有5万个正面情况.
在使用默认参数训练LR和RF分类器之后,我得到两个分类器的非常相似的结果,例如,对于以下测试集:
Test instances: 26842
Test positives = 433.0
Test negatives = 26409.0
Run Code Online (Sandbox Code Playgroud)
分类器检测到:
truePositives = 0.0
trueNegatives = 26409.0
falsePositives = 433.0
falseNegatives = 0.0
Precision = 0.9838685641904478
Recall = 0.9838685641904478
Run Code Online (Sandbox Code Playgroud)
看起来分类器根本无法检测到任何正面实例.此外,无论数据是如何分成训练和测试集,分类提供相同数量的false positives 相等的若干positives该测试设置真的有.
LR分类器默认阈值设置为0.5设置阈值为0.8没有任何区别.
val model = new LogisticRegressionWithLBFGS().run(training)
model.setThreshold(0.8)
Run Code Online (Sandbox Code Playgroud)
问题:
1)请告知如何操纵分类器阈值,使分类器对具有一小部分正实例的类与具有大量负实例的类更具敏感性?
2)任何其他MLlib分类器来解决这个问题?
3)iterceptLogistic回归算法有哪些参数?
val model = new LogisticRegressionWithSGD().setIntercept(true).run(training)
Run Code Online (Sandbox Code Playgroud) random-forest logistic-regression apache-spark apache-spark-mllib