标签: apache-spark-sql

Spark从一行中提取值

我有以下数据帧

val transactions_with_counts = sqlContext.sql(
  """SELECT user_id AS user_id, category_id AS category_id,
  COUNT(category_id) FROM transactions GROUP BY user_id, category_id""")
Run Code Online (Sandbox Code Playgroud)

我正在尝试将行转换为Rating对象,但由于x(0)返回一个数组,因此失败

val ratings = transactions_with_counts
  .map(x => Rating(x(0).toInt, x(1).toInt, x(2).toInt))
Run Code Online (Sandbox Code Playgroud)

错误:值toInt不是Any的成员

scala apache-spark apache-spark-sql

31
推荐指数
2
解决办法
6万
查看次数

如何在Pyspark中加入多个列?

我正在使用Spark 1.3,并希望使用python接口(SparkSQL)加入多个列

以下作品:

我首先将它们注册为临时表.

numeric.registerTempTable("numeric")
Ref.registerTempTable("Ref")

test  = numeric.join(Ref, numeric.ID == Ref.ID, joinType='inner')
Run Code Online (Sandbox Code Playgroud)

我现在想基于多个列加入它们.

我得到SyntaxError:语法无效:

test  = numeric.join(Ref,
   numeric.ID == Ref.ID AND numeric.TYPE == Ref.TYPE AND
   numeric.STATUS == Ref.STATUS ,  joinType='inner')
Run Code Online (Sandbox Code Playgroud)

python join apache-spark apache-spark-sql pyspark

31
推荐指数
3
解决办法
5万
查看次数

Spark:有条件地将列添加到数据框

我正在尝试获取输入数据:

A    B       C
--------------
4    blah    2
2            3
56   foo     3
Run Code Online (Sandbox Code Playgroud)

并根据B是否为空来在末尾添加一列:

A    B       C     D
--------------------
4    blah    2     1
2            3     0
56   foo     3     1
Run Code Online (Sandbox Code Playgroud)

我可以通过将输入数据帧注册为临时表,然后键入SQL查询来轻松完成此操作.

但我真的想知道如何使用Scala方法执行此操作,而不必在Scala中键入SQL查询.

我已经尝试过了.withColumn,但我无法做到我想做的事情.

scala apache-spark apache-spark-sql spark-dataframe

31
推荐指数
1
解决办法
8万
查看次数

Spark是否支持对S3中的镶木地板文件进行真正的列扫描?

Parquet数据存储格式的一大好处是它是柱状的.如果我有一个包含数百列的'宽'数据集,但我的查询只涉及其中的一些,那么它可能只读取存储这几列的数据,并跳过其余的.

据推测,此功能的工作原理是在镶木地板文件的头部读取一些元数据,指示每列的文件系统上的位置.然后,读者可以在磁盘上寻找只读必要的列.

有谁知道spark的默认镶木地板阅读器是否正确地在S3上实现了这种选择性搜索?我认为它得到了S3的支持,但理论支持和正确利用该支持的实现之间存在很大差异.

amazon-s3 apache-spark parquet apache-spark-sql

31
推荐指数
2
解决办法
2635
查看次数

将Spark数据帧保存为Hive中的动态分区表

我有一个示例应用程序正在从csv文件读取数据帧.可以使用该方法将数据帧以镶木地板格式存储到Hive表中 df.saveAsTable(tablename,mode).

上面的代码工作正常,但我每天都有如此多的数据,我想根据creationdate(表中的列)动态分区hive表.

有没有办法动态分区数据帧并将其存储到配置单元仓库.想要避免使用硬编码插入语句hivesqlcontext.sql(insert into table partittioin by(date)....).

问题可以视为以下内容的扩展:如何将DataFrame直接保存到Hive?

任何帮助深表感谢.

hadoop hive apache-spark apache-spark-sql spark-dataframe

30
推荐指数
4
解决办法
7万
查看次数

SPARK SQL替换mysql GROUP_CONCAT聚合函数

我有一个包含两个字符串类型列(用户名,朋友)的表,对于每个用户名,我想在一行中收集所有朋友,连接为字符串('username1','friends1,friends2,friends3').我知道MySql通过GROUP_CONCAT做到这一点,有没有办法用SPARK SQL做到这一点?

谢谢

aggregate-functions apache-spark apache-spark-sql

30
推荐指数
4
解决办法
3万
查看次数

使用Spark DataFrame在列上获取不同的值

使用Spark 1.6.1版本我需要在列上获取不同的值,然后在其上执行一些特定的转换.该列包含超过5000万条记录,并且可以变大.
我知道做一个distinct.collect()会把呼叫带回驱动程序.目前我正在执行如下任务,是否有更好的方法?

 import sqlContext.implicits._
 preProcessedData.persist(StorageLevel.MEMORY_AND_DISK_2)

 preProcessedData.select(ApplicationId).distinct.collect().foreach(x => {
   val applicationId = x.getAs[String](ApplicationId)
   val selectedApplicationData = preProcessedData.filter($"$ApplicationId" === applicationId)
   // DO SOME TASK PER applicationId
 })

 preProcessedData.unpersist()  
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql spark-dataframe

30
推荐指数
3
解决办法
9万
查看次数

Spark sql如何在不丢失空值的情况下爆炸

我有一个Dataframe,我试图压扁.作为整个过程的一部分,我想爆炸它,所以如果我有一列数组,那么数组的每个值都将用于创建一个单独的行.例如,

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]
Run Code Online (Sandbox Code Playgroud)

应该成为

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer
Run Code Online (Sandbox Code Playgroud)

这是我的代码

private DataFrame explodeDataFrame(DataFrame df) {
    DataFrame resultDf = df;
    for (StructField field : df.schema().fields()) {
        if (field.dataType() instanceof ArrayType) {
            resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
            resultDf.show();
        }
    }
    return resultDf;
}
Run Code Online (Sandbox Code Playgroud)

问题是在我的数据中,一些数组列有空值.在这种情况下,整个行都将被删除.所以这个数据帧:

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]
2  | Lucy | null
Run Code Online (Sandbox Code Playgroud)

变 …

java null apache-spark apache-spark-sql

30
推荐指数
2
解决办法
1万
查看次数

聚合函数计算Spark中groupBy的使用情况

我试图在pySpark中的一行代码中进行多个操作,并且不确定这是否适用于我的情况.

我的意图是不必将输出保存为新的数据帧.

我目前的代码很简单:

encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
  .groupBy('timePeriod')
  .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"),
    stddev('DOWNSTREAM_SIZE').alias("Stddev")
  )
  .show(20, False)
Run Code Online (Sandbox Code Playgroud)

我的目的是count()在使用之后添加groupBy,以获得与timePeriod列的每个值匹配的记录计数,打印\显示为输出.

在尝试使用时,groupBy(..).count().agg(..)我得到例外.

是否有任何方法可以实现这两个count()agg() .show()打印,而无需将代码拆分为两行命令,例如:

new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()
Run Code Online (Sandbox Code Playgroud)

或者更好的是,将合并的输出输出到agg.show()输出 - 一个额外的列,它表示与行的值匹配的计数记录数.例如:

timePeriod | Mean | Stddev | Num Of Records
    X      | 10   |   20   |    315
Run Code Online (Sandbox Code Playgroud)

java scala apache-spark apache-spark-sql pyspark

30
推荐指数
1
解决办法
5万
查看次数

使用Spark DataFrame groupby时如何获取其他列?

当我像这样使用DataFrame groupby时:

df.groupBy(df("age")).agg(Map("id"->"count"))
Run Code Online (Sandbox Code Playgroud)

我只会得到一个包含"age"和"count(id)"列的DataFrame,但是在df中,还有许多其他列,例如"name".

总而言之,我希望得到MySQL中的结果,

"按年龄从df组中选择姓名,年龄,计数(id)"

在Spark中使用groupby时我该怎么办?

sql dataframe apache-spark apache-spark-sql

29
推荐指数
3
解决办法
3万
查看次数