标签: apache-spark-sql

SparkSQL与Spark上的Hive - 差异和利弊?

SparkSQL CLI内部使用HiveQL,如果是Hive on spark(HIVE-7292),则hive使用spark作为后端引擎.有人可以提供更多的亮点,这两种情况究竟有何不同以及两种方法的利弊?

hadoop hive apache-spark apache-spark-sql

29
推荐指数
2
解决办法
4万
查看次数

使用Pyspark计算Spark数据帧的每列中的非NaN条目数

我有一个非常大的数据集,在Hive中加载.它由大约190万行和1450列组成.我需要确定每个列的"覆盖率",即每个列具有非NaN值的行的分数.

这是我的代码:

from pyspark import SparkContext
from pyspark.sql import HiveContext
import string as string

sc = SparkContext(appName="compute_coverages") ## Create the context
sqlContext = HiveContext(sc)

df = sqlContext.sql("select * from data_table")
nrows_tot = df.count()

covgs=sc.parallelize(df.columns)
        .map(lambda x: str(x))
        .map(lambda x: (x, float(df.select(x).dropna().count()) / float(nrows_tot) * 100.))
Run Code Online (Sandbox Code Playgroud)

在pyspark shell中尝试这个,如果我然后执行covgs.take(10),它会返回一个相当大的错误堆栈.它说保存文件时出现问题/usr/lib64/python2.6/pickle.py.这是错误的最后一部分:

py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
        at py4j.Gateway.invoke(Gateway.java:252)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

如果有更好的方法来实现这一点,而不是我正在尝试的方式,我愿意接受建议.我不能使用pandas,因为它目前在我工作的集群上不可用,我无权安装它.

python dataframe apache-spark apache-spark-sql pyspark

29
推荐指数
1
解决办法
3万
查看次数

PySpark:when子句中的多个条件

我想修改数据帧列(Age)的单元格值,其中当前它是空白的,我只会在另一列(Survived)的值为0时为相应的行进行修改,其中Age为空白.如果它在Survived列中为1但在Age列中为空,那么我将它保持为null.

我试图使用&&运算符,但它没有用.这是我的代码:

tdata.withColumn("Age",  when((tdata.Age == "" && tdata.Survived == "0"), mean_age_0).otherwise(tdata.Age)).show()
Run Code Online (Sandbox Code Playgroud)

任何建议如何处理?谢谢.

错误信息:

SyntaxError: invalid syntax
  File "<ipython-input-33-3e691784411c>", line 1
    tdata.withColumn("Age",  when((tdata.Age == "" && tdata.Survived == "0"), mean_age_0).otherwise(tdata.Age)).show()
                                                    ^
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

29
推荐指数
2
解决办法
8万
查看次数

如何更改spark数据框中的列位置?

我想知道是否可以更改数据框中列的位置,实际上是否可以更改架构?

确切地说,如果我有一个像[field1,field2,field3]这样的数据帧,我想获得[field1,field3,field2].

任何帮助将非常感激 !

谢谢.

编辑:

我不能放任何代码.让我们假设我们正在处理具有一百列的数据帧,在一些连接和转换之后,这些列中的一些关于目标表的模式是错误的.所以我的观点是:如何移动一列或多列,即:如何更改架构?

谢谢.

scala dataframe apache-spark apache-spark-sql

29
推荐指数
5
解决办法
3万
查看次数

PySpark:withColumn()有两个条件和三个结果

我正在使用Spark和PySpark.我试图实现相当于以下伪代码的结果:

df = df.withColumn('new_column', 
    IF fruit1 == fruit2 THEN 1, ELSE 0. IF fruit1 IS NULL OR fruit2 IS NULL 3.)
Run Code Online (Sandbox Code Playgroud)

我试图在PySpark中这样做,但我不确定语法.有什么指针吗?我调查expr()但无法让它工作.

请注意,这df是一个pyspark.sql.dataframe.DataFrame.

hive hiveql apache-spark apache-spark-sql pyspark

29
推荐指数
3
解决办法
9万
查看次数

DataFrame/Dataset groupBy行为/优化

假设我们有DataFrame,df包含以下列:

名称,姓氏,大小,宽度,长度,重量

现在我们想要执行几个操作,例如我们想要创建一些包含Size和Width数据的DataFrame.

val df1 = df.groupBy("surname").agg( sum("size") )
val df2 = df.groupBy("surname").agg( sum("width") )
Run Code Online (Sandbox Code Playgroud)

正如您所注意到的,其他列(如Length)不会在任何地方使用.Spark是否足够聪明,可以在洗牌阶段之前丢弃多余的列,还是随身携带?威尔跑:

val dfBasic = df.select("surname", "size", "width")
Run Code Online (Sandbox Code Playgroud)

在分组之前以某种方式影响性能?

performance dataframe apache-spark apache-spark-sql apache-spark-dataset

28
推荐指数
1
解决办法
1万
查看次数

对spark数据帧的同一列进行多次聚合操作

我有三个字符串类型的数组包含以下信息:

  • groupBy数组:包含我想要对数据进行分组的列的名称.
  • aggregate array:包含我想要聚合的列的名称.
  • operations array:包含我想要执行的聚合操作

我正在尝试使用spark数据帧来实现这一目标.Spark数据框提供了agg(),您可以在其中传递Map [String,String](列名和相应的聚合操作)作为输入,但是我想对数据的同一列执行不同的聚合操作.有关如何实现这一目标的任何建议?

dataframe apache-spark apache-spark-sql

28
推荐指数
3
解决办法
2万
查看次数

pyspark数据帧过滤器或基于列表包含

我正在尝试使用列表过滤pyspark中的数据帧.我想要根据列表进行过滤,或者仅包含列表中具有值的记录.我的代码不起作用:

# define a dataframe
rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])

# define a list of scores
l = [10,18,20]

# filter out records by scores by list l
records = df.filter(df.score in l)
# expected: (0,1), (0,1), (0,2), (1,2)

# include only records with these scores in list l
records = df.where(df.score in l)
# expected: (1,10), (1,20), (3,18), (3,18), (3,18)
Run Code Online (Sandbox Code Playgroud)

给出以下错误:ValueError:无法将列转换为bool:请使用'&'代表'和','|' 对于'或','〜'表示构建DataFrame布尔表达式时的'not'.

filter apache-spark apache-spark-sql pyspark

28
推荐指数
3
解决办法
4万
查看次数

使用Spark数据集在Scala中执行类型化连接

我喜欢Spark数据集,因为它们在编译时给我分析错误和语法错误,并且允许我使用getter而不是硬编码的名称/数字.大多数计算都可以使用Dataset的高级API完成.例如,通过访问数据集类型对象而不是使用RDD行的数据字段来执行agg,select,sum,avg,map,filter或groupBy操作要简单得多.

但是,由于缺少连接操作,我读到我可以像这样进行连接

ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")
Run Code Online (Sandbox Code Playgroud)

但这不是我想要的,因为我更喜欢通过case类接口来做,所以更像这样的东西

ds1.joinWith(ds2, ds1.key === ds2.key, "inner")
Run Code Online (Sandbox Code Playgroud)

现在最好的选择似乎是在case类旁边创建一个对象,并给这个函数提供正确的列名作为String.所以我会使用第一行代码但是放置一个函数而不是硬编码的列名.但那感觉不够优雅..

有人可以告诉我其他选项吗?目标是从实际的列名中抽象出来,最好通过case类的getter工作.

我正在使用Spark 1.6.1和Scala 2.10

scala join apache-spark apache-spark-sql apache-spark-dataset

28
推荐指数
1
解决办法
8694
查看次数

PySpark group中的中位数/分位数

我想在Spark数据帧上计算组分位数(使用PySpark).近似或精确的结果都可以.我更喜欢在groupBy/ 的上下文中使用的解决方案agg,以便我可以将它与其他PySpark聚合函数混合使用.如果由于某种原因这是不可能的,那么不同的方法也可以.

这个问题是相关的,但没有说明如何approxQuantile用作聚合函数.

我也可以访问percentile_approxHive UDF,但我不知道如何将它用作聚合函数.

为了特异性,假设我有以下数据帧:

from pyspark import SparkContext
import pyspark.sql.functions as f

sc = SparkContext()    

df = sc.parallelize([
    ['A', 1],
    ['A', 2],
    ['A', 3],
    ['B', 4],
    ['B', 5],
    ['B', 6],
]).toDF(('grp', 'val'))

df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val'))
df_grp.show()
Run Code Online (Sandbox Code Playgroud)

预期结果是:

+----+-------+
| grp|med_val|
+----+-------+
|   A|      2|
|   B|      5|
+----+-------+
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark pyspark-sql

28
推荐指数
5
解决办法
2万
查看次数