标签: apache-spark-sql

如何从spark数据帧中过滤掉null值

我使用以下模式在spark中创建了一个数据框:

root
 |-- user_id: long (nullable = false)
 |-- event_id: long (nullable = false)
 |-- invited: integer (nullable = false)
 |-- day_diff: long (nullable = true)
 |-- interested: integer (nullable = false)
 |-- event_owner: long (nullable = false)
 |-- friend_id: long (nullable = false)
Run Code Online (Sandbox Code Playgroud)

数据如下所示:

+----------+----------+-------+--------+----------+-----------+---------+
|   user_id|  event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
|   4236494| 110357109|      0|      -1|         0|  937597069|     null|
|  78065188| 498404626|      0|       0|         0| 2904922087|     null|
| 282487230|2520855981|      0|      28|         0| 3749735525|     null|
| 335269852|1641491432|      0|       2|         0| 1490350911|     null| …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe

49
推荐指数
6
解决办法
12万
查看次数

从Spark DataFrame中的单个列派生多个列

我有一个带有巨大可解析元数据的DF作为Dataframe中的单个字符串列,我们称之为DFA,使用ColmnA.

我想打破这一列,将ColmnA分成多个列,通过一个函数,ClassXYZ = Func1(ColmnA).此函数返回一个具有多个变量的类ClassXYZ,现在每个变量都必须映射到新列,例如ColmnA1,ColmnA2等.

如何通过调用此Func1一次,使用这些附加列从一个Dataframe到另一个Data转换,而不必重复它来创建所有列.

如果我每次都要调用这个巨大的函数添加一个新列,它很容易解决,但这是我希望避免的.

请使用工作或伪代码建议.

谢谢

桑杰

scala user-defined-functions dataframe apache-spark apache-spark-sql

48
推荐指数
3
解决办法
5万
查看次数

如何在groupBy之后将值聚合到集合中?

我有一个带有架构的数据帧:

[visitorId: string, trackingIds: array<string>, emailIds: array<string>]
Run Code Online (Sandbox Code Playgroud)

正在寻找一种方法来分组(或者可能汇总?)由visitorid组成的数据帧,其中的trackingIds和emailIds列将一起追加.所以例如,如果我的初始df看起来像:

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158|      [666b]      |    [12]
|7g21|      [c0b5]      |    [45]
|7g21|      [c0b4]      |    [87]
|a158|      [666b, 777c]|    []
Run Code Online (Sandbox Code Playgroud)

我希望我的输出df看起来像这样

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158|      [666b,666b,777c]|      [12,'']
|7g21|      [c0b5,c0b4]     |      [45, 87]
Run Code Online (Sandbox Code Playgroud)

试图使用groupByagg运营商但没有太多运气.

scala apache-spark apache-spark-sql

48
推荐指数
2
解决办法
5万
查看次数

上传列表以从火花数据框中选择多个列

我有一个火花数据框df.有没有办法使用这些列的列表选择几列?

scala> df.columns
res0: Array[String] = Array("a", "b", "c", "d")
Run Code Online (Sandbox Code Playgroud)

我知道我可以做点什么df.select("b", "c").但是假设我有一个包含几个列名的列表val cols = List("b", "c"),有没有办法将它传递给df.select?df.select(cols)抛出错误.像df.select(*cols)python中的东西

apache-spark apache-spark-sql spark-dataframe

47
推荐指数
3
解决办法
5万
查看次数

从案例类生成Spark StructType/Schema

如果我想创建一个StructType(即a DataFrame.schema)a case class,是否有办法在不创建的情况下创建DataFrame?我可以轻松地做到:

case class TestCase(id: Long)
val schema = Seq[TestCase]().toDF.schema
Run Code Online (Sandbox Code Playgroud)

但实际创建一个DataFrame我想要的只是模式似乎有点过分.

(如果你很好奇,问题背后的原因是我正在定义一个UserDefinedAggregateFunction,并且这样做会覆盖一些返回的方法,StructTypes并使用case类.)

apache-spark apache-spark-sql

47
推荐指数
4
解决办法
2万
查看次数

将Spark Dataframe字符串列拆分为多个列

我见过各种各样的人都认为这Dataframe.explode是一种有用的方法,但它会导致比原始数据帧更多的行,这根本不是我想要的.我只是想做Dataframe相当于非常简单:

rdd.map(lambda row: row + [row.my_str_col.split('-')])
Run Code Online (Sandbox Code Playgroud)

它看起来像:

col1 | my_str_col
-----+-----------
  18 |  856-yygrm
 201 |  777-psgdg
Run Code Online (Sandbox Code Playgroud)

并将其转换为:

col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg
Run Code Online (Sandbox Code Playgroud)

我知道pyspark.sql.functions.split(),但它导致嵌套数组列而不是我想要的两个顶级列.

理想情况下,我希望这些新列也可以命名.

apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql

47
推荐指数
3
解决办法
7万
查看次数

'PipelinedRDD'对象在PySpark中没有属性'toDF'

我正在尝试加载SVM文件并将其转换为一个,DataFrame因此我可以使用PipelineSpark 的ML模块(ML).我刚刚在Ubuntu 14.04上安装了一个新的Spark 1.5.0(没有spark-env.sh配置).

my_script.py是:

from pyspark.mllib.util import MLUtils
from pyspark import SparkContext

sc = SparkContext("local", "Teste Original")
data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)

我正在使用: ./spark-submit my_script.py

我收到错误:

Traceback (most recent call last):
File "/home/fred-spark/spark-1.5.0-bin-hadoop2.6/pipeline_teste_original.py", line 34, in <module>
data = MLUtils.loadLibSVMFile(sc, "/home/fred-spark/svm_capture").toDF()
AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
Run Code Online (Sandbox Code Playgroud)

我无法理解的是,如果我跑:

data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)

直接在PySpark shell中,它的工作原理.

python apache-spark rdd apache-spark-sql pyspark

45
推荐指数
1
解决办法
4万
查看次数

Apache Spark SQLContext和HiveContext有什么区别?

Apache Spark SQLContext和HiveContext有什么区别?

一些消息来源称,由于HiveContext是SQLContext的超集,因此开发人员应始终使用HiveContext,它具有比SQLContext更多的功能.但是每个上下文的当前API大多是相同的.

  • SQLContext/HiveContext更有用的场景是什么?
  • 只有在使用Hive时,HiveContext才更有用吗?
  • 或者SQLContext是否是使用Apache Spark实现大数据应用程序所需的全部内容?

hive apache-spark apache-spark-sql

45
推荐指数
2
解决办法
3万
查看次数

如何最有效地将Scala DataFrame的Row转换为case类?

一旦我在Spark中获得了一些Row类,无论是Dataframe还是Catalyst,我想将它转换为我的代码中的case类.这可以通过匹配来完成

someRow match {case Row(a:Long,b:String,c:Double) => myCaseClass(a,b,c)}
Run Code Online (Sandbox Code Playgroud)

但是当行有大量的列,比如十几个双打,一些布尔,甚至偶尔的空位时,它变得丑陋.

我想能够将-all-cast转换为myCaseClass.是否可能,或者我已经获得了最经济的语法?

scala apache-spark apache-spark-sql

44
推荐指数
4
解决办法
5万
查看次数

Spark为数据帧连接指定多个列条件

如何在连接两个数据帧时提供更多列条件.例如,我想运行以下内容:

val Lead_all = Leads.join(Utm_Master,  
    Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
    Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")
Run Code Online (Sandbox Code Playgroud)

我想只在这些列匹配时才加入.但是上面的语法无效,因为cols只需要一个字符串.那我怎么得到我想要的东西.

apache-spark rdd apache-spark-sql

44
推荐指数
6
解决办法
8万
查看次数