我在这里看到了一个解决方案但是当我尝试它时对我不起作用.
首先我导入cars.csv文件:
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load("/usr/local/spark/cars.csv")
Run Code Online (Sandbox Code Playgroud)
如下所示:
+----+-----+-----+--------------------+-----+
|year| make|model| comment|blank|
+----+-----+-----+--------------------+-----+
|2012|Tesla| S| No comment| |
|1997| Ford| E350|Go get one now th...| |
|2015|Chevy| Volt| null| null|
Run Code Online (Sandbox Code Playgroud)
然后我这样做:
df.na.fill("e",Seq("blank"))
Run Code Online (Sandbox Code Playgroud)
但是空值没有改变.
谁能帮我 ?
我正在使用Spark 2.0 Scala.我可以使用toDF()方法将RDD转换为DataFrame.
val rdd = sc.textFile("/pathtologfile/logfile.txt")
val df = rdd.toDF()
Run Code Online (Sandbox Code Playgroud)
但是对于我的生活,我无法在API文档中找到它的位置.它不属于RDD.但它在DataSet下(链接1).但是我有一个RDD而不是DataSet.
我也看不出它的含义(链接2).
所以请帮助我理解为什么可以为我的RDD调用toDF().这个方法从哪里继承?
注册表registerTempTable(createOrReplaceTempView带有spark 2. +)的表是否已缓存?
使用Zeppelin,我DataFrame在我的scala代码中注册了一个重度计算后,然后在%pyspark我想要访问它,并进一步过滤它.
它会使用表的内存缓存版本吗?或者每次都会重建?
我正在构建一个基于内容的电影推荐系统.这很简单,只需让用户输入电影标题,系统就会找到一部具有最相似功能的电影.
在计算相似度并按降序对分数进行排序后,我找到相应的5个最高相似度得分的电影并返回给用户.
到目前为止,当我想评估系统的准确性时,一切运作良好.我在Google上找到的一些公式只是根据评级值评估准确性(比较预测评级和实际评级,如RMSE).我没有将相似性得分改为评级(从1到5的比例),所以我不能应用任何公式.
您能否建议将相似度得分转换为预测评级,以便我可以应用RMSE?或者有什么想法解决这个问题?
版本:Spark 1.6.2,Scala 2.10
我正在执行下面的命令spark-shell.我试图查看Spark默认创建的分区数.
val rdd1 = sc.parallelize(1 to 10)
println(rdd1.getNumPartitions) // ==> Result is 4
//Creating rdd for the local file test1.txt. It is not HDFS.
//File content is just one word "Hello"
val rdd2 = sc.textFile("C:/test1.txt")
println(rdd2.getNumPartitions) // ==> Result is 2
Run Code Online (Sandbox Code Playgroud)
根据Apache Spark 文档,spark.default.parallelism我的笔记本电脑(2核心处理器)中的核心数量.
我的问题是:rdd2似乎正在给出2个分区的正确结果,如文档中所述.但为什么rdd1将结果作为4个分区?
当我将数据帧写入csv时,会为每个分区创建一个.csv文件.假设我想将每个文件的最大大小限制为1 MB.我可以多次写入并且每次都增加参数以重新分区.有没有办法可以提前计算用于重新分区的参数,以确保每个文件的最大大小小于某个指定的大小.
我想可能存在病理情况,其中所有数据最终都在一个分区上.因此,做出较弱的假设,我们只想确保平均文件大小小于某个指定的数量,比如1 MB.
我试图使用pyspark解决以下问题.我在hdfs上有一个文件,格式是查找表的转储.
key1, value1
key2, value2
...
Run Code Online (Sandbox Code Playgroud)
我想将它加载到pyspark中的python字典中,并将其用于其他目的.所以我试着这样做:
table = {}
def populateDict(line):
(k,v) = line.split(",", 1)
table[k] = v
kvfile = sc.textFile("pathtofile")
kvfile.foreach(populateDict)
Run Code Online (Sandbox Code Playgroud)
我发现表变量没有被修改.那么,有没有办法在spark中创建一个大的内存哈希表?
我正在使用spark和mongo.我可以使用以下代码连接到mongo:
val sc = new SparkContext("local", "Hello from scala")
val config = new Configuration()
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/dbName.collectionName")
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
Run Code Online (Sandbox Code Playgroud)
上面的代码给了我收集的所有文件.
现在我想在查询中应用一些条件.
为此我用过
config.set("mongo.input.query","{customerId: 'some mongo id'}")
Run Code Online (Sandbox Code Playgroud)
这一次只涉及一个条件.如果'usage'> 30,我想添加一个条件
1)如何使用spark和mongo为mongo查询添加多个条件(包括大于和小于)?
另外我想用scala迭代查询结果的每个文件?
2)如何使用scala迭代结果?
我在Apache Spark代码源中遇到了这一行
val (gradientSum, lossSum, miniBatchSize) = data
.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
}
)
Run Code Online (Sandbox Code Playgroud)
我读这个有多个麻烦:
treeAggregate工作方式的内容,这些内容的含义是什么..treeAggregate的方法名称似乎有两个()().这意味着什么?这是一些我不理解的特殊scala语法.这个陈述必须非常先进.我无法开始破译这一点.
我想过滤RDD源列:
val source = sql("SELECT * from sample.source").rdd.map(_.mkString(","))
val destination = sql("select * from sample.destination").rdd.map(_.mkString(","))
val source_primary_key = source.map(rec => (rec.split(",")(0)))
val destination_primary_key = destination.map(rec => (rec.split(",")(0)))
val src = source_primary_key.subtractByKey(destination_primary_key)
Run Code Online (Sandbox Code Playgroud)
我想在过滤条件中使用IN子句从源代码中仅过滤出src中存在的值,如下所示(EDITED):
val source = spark.read.csv(inputPath + "/source").rdd.map(_.mkString(","))
val destination = spark.read.csv(inputPath + "/destination").rdd.map(_.mkString(","))
val source_primary_key = source.map(rec => (rec.split(",")(0)))
val destination_primary_key = destination.map(rec => (rec.split(",")(0)))
val extra_in_source = source_primary_key.filter(rec._1 != destination_primary_key._1)
Run Code Online (Sandbox Code Playgroud)
等效的SQL代码是
SELECT * FROM SOURCE WHERE ID IN (select ID from src)
Run Code Online (Sandbox Code Playgroud)
谢谢