标签: apache-spark-sql

使用scala连接Apache spark中不同RDD的数据集

有没有办法RDD在spark中连接两个不同s的数据集?

要求是 - 我使用具有相同列名的scala创建两个中间RDD,需要组合这两个RDD的结果并缓存访问UI的结果.如何在此处组合数据集?

RDD属于类型 spark.sql.SchemaRDD

scala distributed-computing apache-spark rdd apache-spark-sql

35
推荐指数
1
解决办法
4万
查看次数

使用列的长度过滤DataFrame

我想DataFrame使用与列长度相关的条件来过滤a ,这个问题可能很容易,但我没有在SO中找到任何相关的问题.

更具体的,我有一个DataFrame只有一个Column,其中ArrayType(StringType()),我要筛选的DataFrame使用长度filterer,我拍下面的一个片段.

df = sqlContext.read.parquet("letters.parquet")
df.show()

# The output will be 
# +------------+
# |      tokens|
# +------------+
# |[L, S, Y, S]|
# |[L, V, I, S]|
# |[I, A, N, A]|
# |[I, L, S, A]|
# |[E, N, N, Y]|
# |[E, I, M, A]|
# |[O, A, N, A]|
# |   [S, U, S]|
# +------------+

# But I want only the entries with length …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

35
推荐指数
2
解决办法
5万
查看次数

DataFrame-ified zipWithIndex

我试图解决向数据集添加序列号的古老问题.我正在使用DataFrames,似乎没有相应的DataFrame RDD.zipWithIndex.另一方面,以下工作或多或少按我希望的方式工作:

val origDF = sqlContext.load(...)    

val seqDF= sqlContext.createDataFrame(
    origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)),
    StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields)
)
Run Code Online (Sandbox Code Playgroud)

在我的实际应用程序中,origDF不会直接从文件中加载 - 它将通过将2-3个其他DataFrame连接在一起而创建,并将包含超过1亿行.

有一个更好的方法吗?我该怎么做才能优化它?

apache-spark apache-spark-sql

34
推荐指数
5
解决办法
2万
查看次数

如何在Spark ML中创建正确的分类数据框

我试图通过使用Spark ML api运行随机森林分类,但我遇到了将正确的数据帧输入创建到管道中的问题.

以下是示例数据:

age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"
Run Code Online (Sandbox Code Playgroud)

agehours_per_week是整数,而其他功能包括label salaryRange是分类(String)

加载这个csv文件(让我们称之为sample.csv)可以通过Spark csv库完成,如下所示:

val data = sqlContext.csvFile("/home/dusan/sample.csv")
Run Code Online (Sandbox Code Playgroud)

默认情况下,所有列都作为字符串导入,因此我们需要将"age"和"hours_per_week"更改为Int:

val toInt    = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))
Run Code Online (Sandbox Code Playgroud)

只是为了检查架构现在的样子:

scala> dataFixed.printSchema
root
 |-- age: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- salaryRange: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

然后设置交叉验证器和管道:

val rf = new RandomForestClassifier()
val pipeline …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-mllib

34
推荐指数
2
解决办法
3万
查看次数

Spark使用上一行的值向数据框添加新列

我想知道如何在Spark(Pyspark)中实现以下功能

初始数据帧:

+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
Run Code Online (Sandbox Code Playgroud)

结果数据帧:

+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0|  7.0  |
+--+---+-------+
|3 |7.0|  3.0  |
+--+---+-------+
|2 |3.0|  5.0  |
+--+---+-------+
Run Code Online (Sandbox Code Playgroud)

我设法通过以下方式将新列"附加"到数据框中: df.withColumn("new_Col", df.num * 10)

但是我不知道如何为新列实现这种"行的移位",以便新列具有前一行的字段值(如示例所示).我还在API文档中找不到有关如何通过索引访问DF中某一行的任何内容.

任何帮助,将不胜感激.

python dataframe apache-spark apache-spark-sql pyspark

33
推荐指数
1
解决办法
2万
查看次数

cube,rollup和groupBy运算符之间有什么区别?

问题几乎在标题中.我找不到有关差异的详细文档.

我注意到了一个区别,因为在交换cube和groupBy函数调用时,我会得到不同的结果.我注意到对于使用'cube'的结果,我在经常分组的表达式上得到了很多空值.

sql rollup cube apache-spark apache-spark-sql

33
推荐指数
2
解决办法
1万
查看次数

Spark - SELECT WHERE还是过滤?

选择where子句和过滤Spark有什么区别?
是否存在一个比另一个更合适的用例?

我什么时候用

DataFrame newdf = df.select(df.col("*")).where(df.col("somecol").leq(10))
Run Code Online (Sandbox Code Playgroud)

什么时候

DataFrame newdf = df.select(df.col("*")).filter("somecol <= 10")
Run Code Online (Sandbox Code Playgroud)

更合适?

apache-spark apache-spark-sql

33
推荐指数
2
解决办法
9万
查看次数

尝试将数据帧行映射到更新行时出现编码器错误

当我试图在我的代码中做同样的事情,如下所述

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})
Run Code Online (Sandbox Code Playgroud)

我从这里采取了上述参考: Scala:如何使用scala替换Dataframs中的值 但是我收到编码器错误

无法找到存储在数据集中的类型的编码器.导入spark.im plicits支持原始类型(Int,S tring等)和产品类型(case类)._将在以后的版本中添加对序列化其他类型的支持.

注意:我正在使用spark 2.0!

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

33
推荐指数
2
解决办法
3万
查看次数

DataFrame连接优化 - 广播哈希加入

我试图有效地连接两个DataFrame,其中一个是大的,第二个是小一点.

有没有办法避免这一切洗牌?我无法设置autoBroadCastJoinThreshold,因为它只支持整数 - 我尝试广播的表略大于整数个字节.

有没有办法迫使广播忽略这个变量?

dataframe apache-spark apache-spark-sql apache-spark-1.4

32
推荐指数
4
解决办法
5万
查看次数

如何将Vector拆分为列 - 使用PySpark

上下文:我有DataFrame2列:单词和向量.其中"vector"的列类型是VectorUDT.

一个例子:

word    |  vector
assert  | [435,323,324,212...]
Run Code Online (Sandbox Code Playgroud)

我希望得到这个:

word   |  v1 | v2  | v3 | v4 | v5 | v6 ......
assert | 435 | 5435| 698| 356|....
Run Code Online (Sandbox Code Playgroud)

题:

如何使用PySpark为每个维度拆分包含多列向量的列?

提前致谢

python apache-spark apache-spark-sql pyspark apache-spark-ml

32
推荐指数
1
解决办法
2万
查看次数