小编eli*_*sah的帖子

替换Spark DataFrame中的空值

我在这里看到了一个解决方案但是当我尝试它时对我不起作用.

首先我导入cars.csv文件:

val df = sqlContext.read
              .format("com.databricks.spark.csv")
              .option("header", "true")
              .load("/usr/local/spark/cars.csv")
Run Code Online (Sandbox Code Playgroud)

如下所示:

+----+-----+-----+--------------------+-----+
|year| make|model|             comment|blank|
+----+-----+-----+--------------------+-----+
|2012|Tesla|    S|          No comment|     |
|1997| Ford| E350|Go get one now th...|     |
|2015|Chevy| Volt|                null| null|
Run Code Online (Sandbox Code Playgroud)

然后我这样做:

df.na.fill("e",Seq("blank"))
Run Code Online (Sandbox Code Playgroud)

但是空值没有改变.

谁能帮我 ?

scala dataframe apache-spark

16
推荐指数
1
解决办法
3万
查看次数

Spark 2.0 Scala - RDD.toDF()

我正在使用Spark 2.0 Scala.我可以使用toDF()方法将RDD转换为DataFrame.

val rdd = sc.textFile("/pathtologfile/logfile.txt")
val df = rdd.toDF()
Run Code Online (Sandbox Code Playgroud)

但是对于我的生活,我无法在API文档中找到它的位置.它不属于RDD.但它在DataSet下(链接1).但是我有一个RDD而不是DataSet.

我也看不出它的含义(链接2).

所以请帮助我理解为什么可以为我的RDD调用toDF().这个方法从哪里继承?

scala apache-spark

15
推荐指数
2
解决办法
4万
查看次数

使用spark-sql临时表缓存

注册表registerTempTable(createOrReplaceTempView带有spark 2. +)的表是否已缓存?

使用Zeppelin,我DataFrame在我的scala代码中注册了一个重度计算后,然后在%pyspark我想要访问它,并进一步过滤它.

它会使用表的内存缓存版本吗?或者每次都会重建?

apache-spark apache-spark-sql

14
推荐指数
2
解决办法
2万
查看次数

如何评估基于内容的推荐系统

我正在构建一个基于内容的电影推荐系统.这很简单,只需让用户输入电影标题,系统就会找到一部具有最相似功能的电影.

在计算相似度并按降序对分数进行排序后,我找到相应的5个最高相似度得分的电影并返回给用户.

到目前为止,当我想评估系统的准确性时,一切运作良好.我在Google上找到的一些公式只是根据评级值评估准确性(比较预测评级和实际评级,如RMSE).我没有将相似性得分改为评级(从1到5的比例),所以我不能应用任何公式.

您能否建议将相似度得分转换为预测评级,以便我可以应用RMSE?或者有什么想法解决这个问题?

recommendation-engine

13
推荐指数
1
解决办法
4044
查看次数

Spark RDD默认分区数

版本:Spark 1.6.2,Scala 2.10

我正在执行下面的命令spark-shell.我试图查看Spark默认创建的分区数.

val rdd1 = sc.parallelize(1 to 10)
println(rdd1.getNumPartitions) // ==> Result is 4

//Creating rdd for the local file test1.txt. It is not HDFS.
//File content is just one word "Hello"
val rdd2 = sc.textFile("C:/test1.txt")
println(rdd2.getNumPartitions) // ==> Result is 2
Run Code Online (Sandbox Code Playgroud)

根据Apache Spark 文档,spark.default.parallelism我的笔记本电脑(2核心处理器)中的核心数量.

我的问题是:rdd2似乎正在给出2个分区的正确结果,如文档中所述.但为什么rdd1将结果作为4个分区?

scala apache-spark

12
推荐指数
1
解决办法
2万
查看次数

限制数据帧分区的最大大小

当我将数据帧写入csv时,会为每个分区创建一个.csv文件.假设我想将每个文件的最大大小限制为1 MB.我可以多次写入并且每次都增加参数以重新分区.有没有办法可以提前计算用于重新分区的参数,以确保每个文件的最大大小小于某个指定的大小.

我想可能存在病理情况,其中所有数据最终都在一个分区上.因此,做出较弱的假设,我们只想确保平均文件大小小于某个指定的数量,比如1 MB.

scala apache-spark apache-spark-sql

12
推荐指数
1
解决办法
1898
查看次数

在pyspark中创建一个大字典

我试图使用pyspark解决以下问题.我在hdfs上有一个文件,格式是查找表的转储.

key1, value1
key2, value2
...
Run Code Online (Sandbox Code Playgroud)

我想将它加载到pyspark中的python字典中,并将其用于其他目的.所以我试着这样做:

table = {}
def populateDict(line):
    (k,v) = line.split(",", 1)
    table[k] = v

kvfile = sc.textFile("pathtofile")
kvfile.foreach(populateDict)
Run Code Online (Sandbox Code Playgroud)

我发现表变量没有被修改.那么,有没有办法在spark中创建一个大的内存哈希表?

python apache-spark

11
推荐指数
1
解决办法
9387
查看次数

如何使用spark查询mongo?

我正在使用spark和mongo.我可以使用以下代码连接到mongo:

val sc = new SparkContext("local", "Hello from scala")

val config = new Configuration()
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/dbName.collectionName")
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
Run Code Online (Sandbox Code Playgroud)

上面的代码给了我收集的所有文件.

现在我想在查询中应用一些条件.

为此我用过

config.set("mongo.input.query","{customerId: 'some mongo id'}")
Run Code Online (Sandbox Code Playgroud)

这一次只涉及一个条件.如果'usage'> 30,我想添加一个条件

1)如何使用spark和mongo为mongo查询添加多个条件(包括大于和小于)?

另外我想用scala迭代查询结果的每个文件?

2)如何使用scala迭代结果?

scala mongodb apache-spark

11
推荐指数
1
解决办法
7578
查看次数

如何解释RDD.treeAggregate

我在Apache Spark代码源中遇到了这一行

val (gradientSum, lossSum, miniBatchSize) = data
    .sample(false, miniBatchFraction, 42 + i)
    .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
      seqOp = (c, v) => {
        // c: (grad, loss, count), v: (label, features)
        val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
        (c._1, c._2 + l, c._3 + 1)
      },
      combOp = (c1, c2) => {
        // c: (grad, loss, count)
        (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
      }
    )
Run Code Online (Sandbox Code Playgroud)

我读这个有多个麻烦:

  • 首先,我在网上找不到任何可以解释确切treeAggregate工作方式的内容,这些内容的含义是什么.
  • 其次,这里.treeAggregate的方法名称似乎有两个()().这意味着什么?这是一些我不理解的特殊scala语法.
  • 最后,我看到seqOp和comboOp都返回一个3元素元组,它与预期的左侧变量匹配,但实际返回了哪一个?

这个陈述必须非常先进.我无法开始破译这一点.

scala distributed-computing apache-spark rdd

11
推荐指数
1
解决办法
5768
查看次数

如何在spark中的过滤条件中使用NOT IN子句

我想过滤RDD源列:

val source = sql("SELECT * from sample.source").rdd.map(_.mkString(","))
val destination = sql("select * from sample.destination").rdd.map(_.mkString(","))

val source_primary_key = source.map(rec => (rec.split(",")(0)))
val destination_primary_key = destination.map(rec => (rec.split(",")(0)))

val src = source_primary_key.subtractByKey(destination_primary_key)
Run Code Online (Sandbox Code Playgroud)

我想在过滤条件中使用IN子句从源代码中仅过滤出src中存在的值,如下所示(EDITED):

val source = spark.read.csv(inputPath + "/source").rdd.map(_.mkString(","))
val destination = spark.read.csv(inputPath + "/destination").rdd.map(_.mkString(","))

val source_primary_key = source.map(rec => (rec.split(",")(0)))
val destination_primary_key = destination.map(rec => (rec.split(",")(0)))

val extra_in_source = source_primary_key.filter(rec._1 != destination_primary_key._1)
Run Code Online (Sandbox Code Playgroud)

等效的SQL代码是

SELECT * FROM SOURCE WHERE ID IN (select ID from src)
Run Code Online (Sandbox Code Playgroud)

谢谢

scala apache-spark apache-spark-sql

11
推荐指数
2
解决办法
3万
查看次数