有没有办法RDD在spark中连接两个不同s的数据集?
要求是 - 我使用具有相同列名的scala创建两个中间RDD,需要组合这两个RDD的结果并缓存访问UI的结果.如何在此处组合数据集?
RDD属于类型 spark.sql.SchemaRDD
scala distributed-computing apache-spark rdd apache-spark-sql
我想DataFrame使用与列长度相关的条件来过滤a ,这个问题可能很容易,但我没有在SO中找到任何相关的问题.
更具体的,我有一个DataFrame只有一个Column,其中ArrayType(StringType()),我要筛选的DataFrame使用长度filterer,我拍下面的一个片段.
df = sqlContext.read.parquet("letters.parquet")
df.show()
# The output will be
# +------------+
# | tokens|
# +------------+
# |[L, S, Y, S]|
# |[L, V, I, S]|
# |[I, A, N, A]|
# |[I, L, S, A]|
# |[E, N, N, Y]|
# |[E, I, M, A]|
# |[O, A, N, A]|
# | [S, U, S]|
# +------------+
# But I want only the entries with length …Run Code Online (Sandbox Code Playgroud) 我试图解决向数据集添加序列号的古老问题.我正在使用DataFrames,似乎没有相应的DataFrame RDD.zipWithIndex.另一方面,以下工作或多或少按我希望的方式工作:
val origDF = sqlContext.load(...)
val seqDF= sqlContext.createDataFrame(
origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)),
StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields)
)
Run Code Online (Sandbox Code Playgroud)
在我的实际应用程序中,origDF不会直接从文件中加载 - 它将通过将2-3个其他DataFrame连接在一起而创建,并将包含超过1亿行.
有一个更好的方法吗?我该怎么做才能优化它?
我试图通过使用Spark ML api运行随机森林分类,但我遇到了将正确的数据帧输入创建到管道中的问题.
以下是示例数据:
age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"
Run Code Online (Sandbox Code Playgroud)
age和hours_per_week是整数,而其他功能包括label salaryRange是分类(String)
加载这个csv文件(让我们称之为sample.csv)可以通过Spark csv库完成,如下所示:
val data = sqlContext.csvFile("/home/dusan/sample.csv")
Run Code Online (Sandbox Code Playgroud)
默认情况下,所有列都作为字符串导入,因此我们需要将"age"和"hours_per_week"更改为Int:
val toInt = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))
Run Code Online (Sandbox Code Playgroud)
只是为了检查架构现在的样子:
scala> dataFixed.printSchema
root
|-- age: integer (nullable = true)
|-- hours_per_week: integer (nullable = true)
|-- education: string (nullable = true)
|-- sex: string (nullable = true)
|-- salaryRange: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
然后设置交叉验证器和管道:
val rf = new RandomForestClassifier()
val pipeline …Run Code Online (Sandbox Code Playgroud) 我想知道如何在Spark(Pyspark)中实现以下功能
初始数据帧:
+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
Run Code Online (Sandbox Code Playgroud)
结果数据帧:
+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0| 7.0 |
+--+---+-------+
|3 |7.0| 3.0 |
+--+---+-------+
|2 |3.0| 5.0 |
+--+---+-------+
Run Code Online (Sandbox Code Playgroud)
我设法通过以下方式将新列"附加"到数据框中:
df.withColumn("new_Col", df.num * 10)
但是我不知道如何为新列实现这种"行的移位",以便新列具有前一行的字段值(如示例所示).我还在API文档中找不到有关如何通过索引访问DF中某一行的任何内容.
任何帮助,将不胜感激.
问题几乎在标题中.我找不到有关差异的详细文档.
我注意到了一个区别,因为在交换cube和groupBy函数调用时,我会得到不同的结果.我注意到对于使用'cube'的结果,我在经常分组的表达式上得到了很多空值.
选择where子句和过滤Spark有什么区别?
是否存在一个比另一个更合适的用例?
我什么时候用
DataFrame newdf = df.select(df.col("*")).where(df.col("somecol").leq(10))
Run Code Online (Sandbox Code Playgroud)
什么时候
DataFrame newdf = df.select(df.col("*")).filter("somecol <= 10")
Run Code Online (Sandbox Code Playgroud)
更合适?
当我试图在我的代码中做同样的事情,如下所述
dataframe.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
Run Code Online (Sandbox Code Playgroud)
我从这里采取了上述参考: Scala:如何使用scala替换Dataframs中的值 但是我收到编码器错误
无法找到存储在数据集中的类型的编码器.导入spark.im plicits支持原始类型(Int,S tring等)和产品类型(case类)._将在以后的版本中添加对序列化其他类型的支持.
注意:我正在使用spark 2.0!
scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders
我试图有效地连接两个DataFrame,其中一个是大的,第二个是小一点.
有没有办法避免这一切洗牌?我无法设置autoBroadCastJoinThreshold,因为它只支持整数 - 我尝试广播的表略大于整数个字节.
有没有办法迫使广播忽略这个变量?
上下文:我有DataFrame2列:单词和向量.其中"vector"的列类型是VectorUDT.
一个例子:
word | vector
assert | [435,323,324,212...]
Run Code Online (Sandbox Code Playgroud)
我希望得到这个:
word | v1 | v2 | v3 | v4 | v5 | v6 ......
assert | 435 | 5435| 698| 356|....
Run Code Online (Sandbox Code Playgroud)
题:
如何使用PySpark为每个维度拆分包含多列向量的列?
提前致谢
python apache-spark apache-spark-sql pyspark apache-spark-ml
apache-spark ×10
apache-spark-sql ×10
dataframe ×3
pyspark ×3
python ×3
scala ×3
cube ×1
rdd ×1
rollup ×1
sql ×1