小编zer*_*323的帖子

dataframe:如何groupBy/count然后过滤Scala中的count

Spark 1.4.1

我遇到一种情况,按数据框进行分组,然后对'count'列进行计数和过滤会引发下面的异常

import sqlContext.implicits._
import org.apache.spark.sql._

case class Paf(x:Int)
val myData = Seq(Paf(2), Paf(1), Paf(2))
val df = sc.parallelize(myData, 2).toDF()
Run Code Online (Sandbox Code Playgroud)

然后分组和过滤:

df.groupBy("x").count()
  .filter("count >= 2")
  .show()
Run Code Online (Sandbox Code Playgroud)

引发异常:

java.lang.RuntimeException: [1.7] failure: ``('' expected but `>=' found count >= 2
Run Code Online (Sandbox Code Playgroud)

解:

重命名列会使问题消失(因为我怀疑与插值'count'函数没有冲突'

df.groupBy("x").count()
  .withColumnRenamed("count", "n")
  .filter("n >= 2")
  .show()
Run Code Online (Sandbox Code Playgroud)

那么,这是一种期望的行为,一个错误还是一种规范的方式?

谢谢,亚历克斯

scala apache-spark apache-spark-sql

35
推荐指数
3
解决办法
9万
查看次数

如何在Spark ML中创建正确的分类数据框

我试图通过使用Spark ML api运行随机森林分类,但我遇到了将正确的数据帧输入创建到管道中的问题.

以下是示例数据:

age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"
Run Code Online (Sandbox Code Playgroud)

agehours_per_week是整数,而其他功能包括label salaryRange是分类(String)

加载这个csv文件(让我们称之为sample.csv)可以通过Spark csv库完成,如下所示:

val data = sqlContext.csvFile("/home/dusan/sample.csv")
Run Code Online (Sandbox Code Playgroud)

默认情况下,所有列都作为字符串导入,因此我们需要将"age"和"hours_per_week"更改为Int:

val toInt    = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))
Run Code Online (Sandbox Code Playgroud)

只是为了检查架构现在的样子:

scala> dataFixed.printSchema
root
 |-- age: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- salaryRange: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

然后设置交叉验证器和管道:

val rf = new RandomForestClassifier()
val pipeline …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-mllib

34
推荐指数
2
解决办法
3万
查看次数

Spark - 从DataFrame中提取单个值

我有一个Spark DataFrame查询,保证返回单个Int值的单列.从生成的DataFrame中将此值作为Int提取的最佳方法是什么?

scala apache-spark apache-spark-sql

32
推荐指数
3
解决办法
4万
查看次数

Spark从一行中提取值

我有以下数据帧

val transactions_with_counts = sqlContext.sql(
  """SELECT user_id AS user_id, category_id AS category_id,
  COUNT(category_id) FROM transactions GROUP BY user_id, category_id""")
Run Code Online (Sandbox Code Playgroud)

我正在尝试将行转换为Rating对象,但由于x(0)返回一个数组,因此失败

val ratings = transactions_with_counts
  .map(x => Rating(x(0).toInt, x(1).toInt, x(2).toInt))
Run Code Online (Sandbox Code Playgroud)

错误:值toInt不是Any的成员

scala apache-spark apache-spark-sql

31
推荐指数
2
解决办法
6万
查看次数

AttributeError:'DataFrame'对象没有属性'map'

我想使用下面的代码转换spark数据框:

from pyspark.mllib.clustering import KMeans
spark_df = sqlContext.createDataFrame(pandas_df)
rdd = spark_df.map(lambda data: Vectors.dense([float(c) for c in data]))
model = KMeans.train(rdd, 2, maxIterations=10, runs=30, initializationMode="random")
Run Code Online (Sandbox Code Playgroud)

详细的错误消息是:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-11-a19a1763d3ac> in <module>()
      1 from pyspark.mllib.clustering import KMeans
      2 spark_df = sqlContext.createDataFrame(pandas_df)
----> 3 rdd = spark_df.map(lambda data: Vectors.dense([float(c) for c in data]))
      4 model = KMeans.train(rdd, 2, maxIterations=10, runs=30, initializationMode="random")

/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in __getattr__(self, name)
    842         if name not in self.columns:
    843             raise AttributeError(
--> 844                 "'%s' object …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark spark-dataframe apache-spark-mllib

31
推荐指数
1
解决办法
5万
查看次数

SPARK SQL替换mysql GROUP_CONCAT聚合函数

我有一个包含两个字符串类型列(用户名,朋友)的表,对于每个用户名,我想在一行中收集所有朋友,连接为字符串('username1','friends1,friends2,friends3').我知道MySql通过GROUP_CONCAT做到这一点,有没有办法用SPARK SQL做到这一点?

谢谢

aggregate-functions apache-spark apache-spark-sql

30
推荐指数
4
解决办法
3万
查看次数

Spark RDD - 使用额外参数进行映射

是否可以将额外的参数传递给pySpark中的映射函数?具体来说,我有以下代码配方:

raw_data_rdd = sc.textFile("data.json", use_unicode=True)
json_data_rdd = raw_data_rdd.map(lambda line: json.loads(line))
mapped_rdd = json_data_rdd.flatMap(processDataLine)
Run Code Online (Sandbox Code Playgroud)

processDataLine除了JSON对象之外,该函数还需要额外的参数,如下所示:

def processDataLine(dataline, arg1, arg2)
Run Code Online (Sandbox Code Playgroud)

如何传递额外的参数arg1,并arg2flaMap功能?

python apache-spark rdd pyspark

28
推荐指数
1
解决办法
2万
查看次数

处理Spark MLlib中的不平衡数据集

我工作在一个特定的二元分类问题具有高度不平衡的数据集,我想知道是否有人试图实现特定的技术来处理数据集不平衡(如SMOTE)的分类问题,用放电的MLlib.

我正在使用MLLib的随机森林实现,并且已经尝试了最简单的方法来随机地对较大的类进行采样,但它没有像我预期的那样工作.

如果您对类似问题的体验有任何反馈,我将不胜感激.

谢谢,

classification machine-learning apache-spark apache-spark-mllib

28
推荐指数
1
解决办法
2万
查看次数

如何为同等大小的分区的Spark RDD定义自定义分区程序,其中每个分区具有相同数量的元素?

我是Spark的新手.我有一个大的元素数据集[RDD],我想把它分成两个完全相同大小的分区,维护元素的顺序.我试着用RangePartitioner

var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))
Run Code Online (Sandbox Code Playgroud)

这不能给出令人满意的结果,因为它大致分割但不完全相同的大小维持元素的顺序.例如,如果有64个元素,我们使用 Rangepartitioner,然后它分为31个元素和33个元素.

我需要一个分区器,以便我在一半中获得前32个元素,而另一半包含第二组32个元素.你能否通过建议如何使用自定义分区器来帮助我,这样我可以获得相同大小的两半,保持元素的顺序?

hadoop scala apache-spark

27
推荐指数
2
解决办法
4万
查看次数

如何从CrossValidatorModel中提取最佳参数

我想ParamGridBuilder在Spark 1.4.x中找到CrossValidator中最佳模型的参数,

在Spark文档中的Pipeline示例中,它们通过在管道中使用来添加不同的参数(numFeatures,regParam)ParamGridBuilder.然后通过以下代码行创建最佳模型:

val cvModel = crossval.fit(training.toDF)
Run Code Online (Sandbox Code Playgroud)

现在,我想知道从中产生最佳模型的参数(numFeatures,regParam)是什么ParamGridBuilder.

我已经使用了以下命令但没有成功:

cvModel.bestModel.extractParamMap().toString()
cvModel.params.toList.mkString("(", ",", ")")
cvModel.estimatorParamMaps.toString()
cvModel.explainParams()
cvModel.getEstimatorParamMaps.mkString("(", ",", ")")
cvModel.toString()
Run Code Online (Sandbox Code Playgroud)

有帮助吗?

提前致谢,

pipeline scala cross-validation apache-spark apache-spark-mllib

27
推荐指数
2
解决办法
1万
查看次数