我有以下数据帧
val transactions_with_counts = sqlContext.sql(
"""SELECT user_id AS user_id, category_id AS category_id,
COUNT(category_id) FROM transactions GROUP BY user_id, category_id""")
Run Code Online (Sandbox Code Playgroud)
我正在尝试将行转换为Rating对象,但由于x(0)返回一个数组,因此失败
val ratings = transactions_with_counts
.map(x => Rating(x(0).toInt, x(1).toInt, x(2).toInt))
Run Code Online (Sandbox Code Playgroud)
错误:值toInt不是Any的成员
我正在使用Spark 1.3,并希望使用python接口(SparkSQL)加入多个列
以下作品:
我首先将它们注册为临时表.
numeric.registerTempTable("numeric")
Ref.registerTempTable("Ref")
test = numeric.join(Ref, numeric.ID == Ref.ID, joinType='inner')
Run Code Online (Sandbox Code Playgroud)
我现在想基于多个列加入它们.
我得到SyntaxError:语法无效:
test = numeric.join(Ref,
numeric.ID == Ref.ID AND numeric.TYPE == Ref.TYPE AND
numeric.STATUS == Ref.STATUS , joinType='inner')
Run Code Online (Sandbox Code Playgroud) 我正在尝试获取输入数据:
A B C
--------------
4 blah 2
2 3
56 foo 3
Run Code Online (Sandbox Code Playgroud)
并根据B是否为空来在末尾添加一列:
A B C D
--------------------
4 blah 2 1
2 3 0
56 foo 3 1
Run Code Online (Sandbox Code Playgroud)
我可以通过将输入数据帧注册为临时表,然后键入SQL查询来轻松完成此操作.
但我真的想知道如何使用Scala方法执行此操作,而不必在Scala中键入SQL查询.
我已经尝试过了.withColumn,但我无法做到我想做的事情.
我有一个示例应用程序正在从csv文件读取数据帧.可以使用该方法将数据帧以镶木地板格式存储到Hive表中
df.saveAsTable(tablename,mode).
上面的代码工作正常,但我每天都有如此多的数据,我想根据creationdate(表中的列)动态分区hive表.
有没有办法动态分区数据帧并将其存储到配置单元仓库.想要避免使用硬编码插入语句hivesqlcontext.sql(insert into table partittioin by(date)....).
问题可以视为以下内容的扩展:如何将DataFrame直接保存到Hive?
任何帮助深表感谢.
我有一个包含两个字符串类型列(用户名,朋友)的表,对于每个用户名,我想在一行中收集所有朋友,连接为字符串('username1','friends1,friends2,friends3').我知道MySql通过GROUP_CONCAT做到这一点,有没有办法用SPARK SQL做到这一点?
谢谢
使用Spark 1.6.1版本我需要在列上获取不同的值,然后在其上执行一些特定的转换.该列包含超过5000万条记录,并且可以变大.
我知道做一个distinct.collect()会把呼叫带回驱动程序.目前我正在执行如下任务,是否有更好的方法?
import sqlContext.implicits._
preProcessedData.persist(StorageLevel.MEMORY_AND_DISK_2)
preProcessedData.select(ApplicationId).distinct.collect().foreach(x => {
val applicationId = x.getAs[String](ApplicationId)
val selectedApplicationData = preProcessedData.filter($"$ApplicationId" === applicationId)
// DO SOME TASK PER applicationId
})
preProcessedData.unpersist()
Run Code Online (Sandbox Code Playgroud) scala dataframe apache-spark apache-spark-sql spark-dataframe
我有一个Dataframe,我试图压扁.作为整个过程的一部分,我想爆炸它,所以如果我有一列数组,那么数组的每个值都将用于创建一个单独的行.例如,
id | name | likes
_______________________________
1 | Luke | [baseball, soccer]
Run Code Online (Sandbox Code Playgroud)
应该成为
id | name | likes
_______________________________
1 | Luke | baseball
1 | Luke | soccer
Run Code Online (Sandbox Code Playgroud)
这是我的代码
private DataFrame explodeDataFrame(DataFrame df) {
DataFrame resultDf = df;
for (StructField field : df.schema().fields()) {
if (field.dataType() instanceof ArrayType) {
resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
resultDf.show();
}
}
return resultDf;
}
Run Code Online (Sandbox Code Playgroud)
问题是在我的数据中,一些数组列有空值.在这种情况下,整个行都将被删除.所以这个数据帧:
id | name | likes
_______________________________
1 | Luke | [baseball, soccer]
2 | Lucy | null
Run Code Online (Sandbox Code Playgroud)
变 …
我试图在pySpark中的一行代码中进行多个操作,并且不确定这是否适用于我的情况.
我的意图是不必将输出保存为新的数据帧.
我目前的代码很简单:
encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
.groupBy('timePeriod')
.agg(
mean('DOWNSTREAM_SIZE').alias("Mean"),
stddev('DOWNSTREAM_SIZE').alias("Stddev")
)
.show(20, False)
Run Code Online (Sandbox Code Playgroud)
我的目的是count()在使用之后添加groupBy,以获得与timePeriod列的每个值匹配的记录计数,打印\显示为输出.
在尝试使用时,groupBy(..).count().agg(..)我得到例外.
是否有任何方法可以实现这两个count()和agg() .show()打印,而无需将代码拆分为两行命令,例如:
new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()
Run Code Online (Sandbox Code Playgroud)
或者更好的是,将合并的输出输出到agg.show()输出 - 一个额外的列,它表示与行的值匹配的计数记录数.例如:
timePeriod | Mean | Stddev | Num Of Records
X | 10 | 20 | 315
Run Code Online (Sandbox Code Playgroud) 当我像这样使用DataFrame groupby时:
df.groupBy(df("age")).agg(Map("id"->"count"))
Run Code Online (Sandbox Code Playgroud)
我只会得到一个包含"age"和"count(id)"列的DataFrame,但是在df中,还有许多其他列,例如"name".
总而言之,我希望得到MySQL中的结果,
"按年龄从df组中选择姓名,年龄,计数(id)"
在Spark中使用groupby时我该怎么办?