小编dhe*_*eee的帖子

火花数据帧中滤波的多个条件

我有一个包含四个字段的数据框.其中一个字段名称是Status,我试图在.filter中使用OR条件来表示数据帧.我试过下面的查询,但没有运气.

df2 = df1.filter(("Status=2") || ("Status =3"))

df2 = df1.filter("Status=2" || "Status =3")
Run Code Online (Sandbox Code Playgroud)

有没有人以前用过这个.我在这里看到了关于堆栈溢出的类似问题.他们使用下面的代码来使用OR条件.但该代码适用于pyspark.

from pyspark.sql.functions import col 

numeric_filtered = df.where(
(col('LOW')    != 'null') | 
(col('NORMAL') != 'null') |
(col('HIGH')   != 'null'))
numeric_filtered.show()
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

27
推荐指数
2
解决办法
8万
查看次数

如何在spark中的DataFrame中计算列的百分位数?

我正在尝试计算DataFrame中列的百分位数?我无法在Spark聚合函数中找到任何percentile_approx函数.

例如在Hive中我们有percentile_approx,我们可以通过以下方式使用它

hiveContext.sql("select percentile_approx("Open_Rate",0.10) from myTable); 
Run Code Online (Sandbox Code Playgroud)

但出于性能原因,我想使用Spark DataFrame来实现它.

样本数据集

|User ID|Open_Rate|
------------------- 
|A1     |10.3     |
|B1     |4.04     |
|C1     |21.7     |
|D1     |18.6     |
Run Code Online (Sandbox Code Playgroud)

我想知道有多少用户分为10百分位或20百分位等等.我想做这样的事情

df.select($"id",Percentile($"Open_Rate",0.1)).show
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe

10
推荐指数
2
解决办法
9831
查看次数

将多个小文件合并到Spark中的几个较大文件中

我通过Spark使用配置单元.我在我的spark代码中有一个Insert into partitioned table query.输入数据为200 + gb.当Spark写入分区表时,它会吐出非常小的文件(kb中的文件).所以现在输出分区表文件夹有5000+个小kb文件.我想将这些合并到几个大的MB文件中,可能只有几个200mb的文件.我厌倦了使用配置单元合并设置,但它们似乎不起作用.

'val result7A = hiveContext.sql("set hive.exec.dynamic.partition=true")

 val result7B = hiveContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")

val result7C = hiveContext.sql("SET hive.merge.size.per.task=256000000")

val result7D = hiveContext.sql("SET hive.merge.mapfiles=true")

val result7E = hiveContext.sql("SET hive.merge.mapredfiles=true")

val result7F = hiveContext.sql("SET hive.merge.sparkfiles = true")

val result7G = hiveContext.sql("set hive.aux.jars.path=c:\\Applications\\json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar")

val result8 = hiveContext.sql("INSERT INTO TABLE partition_table PARTITION (date) select a,b,c from partition_json_table")'
Run Code Online (Sandbox Code Playgroud)

上述配置单元设置在mapreduce配置单元执行中工作,并吐出指定大小的文件.有没有选择做Spark或Scala?

hadoop hive scala apache-spark apache-spark-sql

8
推荐指数
2
解决办法
1万
查看次数

在Spark中使用窗口函数

我试图在Spark数据帧中使用rowNumber.我的查询在Spark shell中按预期工作.但是当我在eclipse中编写它们并编译一个jar时,我面临一个错误

 16/03/23 05:52:43 ERROR ApplicationMaster: User class threw exception:org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
Run Code Online (Sandbox Code Playgroud)

我的疑问

import org.apache.spark.sql.functions.{rowNumber, max, broadcast}
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"id").orderBy($"value".desc)

val dfTop = df.withColumn("rn", rowNumber.over(w)).where($"rn" <= 3).drop("rn")
Run Code Online (Sandbox Code Playgroud)

我在Spark shell中运行查询时没有使用HiveContext.不知道为什么当我运行jar文件时它返回错误.如果有帮助的话,我也在Spark 1.6.0上运行脚本.有没有人面临类似的问题?

window-functions apache-spark apache-spark-sql

3
推荐指数
1
解决办法
8088
查看次数