我使用以下模式在spark中创建了一个数据框:
root
|-- user_id: long (nullable = false)
|-- event_id: long (nullable = false)
|-- invited: integer (nullable = false)
|-- day_diff: long (nullable = true)
|-- interested: integer (nullable = false)
|-- event_owner: long (nullable = false)
|-- friend_id: long (nullable = false)
Run Code Online (Sandbox Code Playgroud)
数据如下所示:
+----------+----------+-------+--------+----------+-----------+---------+
| user_id| event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
| 4236494| 110357109| 0| -1| 0| 937597069| null|
| 78065188| 498404626| 0| 0| 0| 2904922087| null|
| 282487230|2520855981| 0| 28| 0| 3749735525| null|
| 335269852|1641491432| 0| 2| 0| 1490350911| null| …Run Code Online (Sandbox Code Playgroud) 我有一个带有巨大可解析元数据的DF作为Dataframe中的单个字符串列,我们称之为DFA,使用ColmnA.
我想打破这一列,将ColmnA分成多个列,通过一个函数,ClassXYZ = Func1(ColmnA).此函数返回一个具有多个变量的类ClassXYZ,现在每个变量都必须映射到新列,例如ColmnA1,ColmnA2等.
如何通过调用此Func1一次,使用这些附加列从一个Dataframe到另一个Data转换,而不必重复它来创建所有列.
如果我每次都要调用这个巨大的函数添加一个新列,它很容易解决,但这是我希望避免的.
请使用工作或伪代码建议.
谢谢
桑杰
scala user-defined-functions dataframe apache-spark apache-spark-sql
我有一个带有架构的数据帧:
[visitorId: string, trackingIds: array<string>, emailIds: array<string>]
Run Code Online (Sandbox Code Playgroud)
正在寻找一种方法来分组(或者可能汇总?)由visitorid组成的数据帧,其中的trackingIds和emailIds列将一起追加.所以例如,如果我的初始df看起来像:
visitorId |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b] | [12]
|7g21| [c0b5] | [45]
|7g21| [c0b4] | [87]
|a158| [666b, 777c]| []
Run Code Online (Sandbox Code Playgroud)
我希望我的输出df看起来像这样
visitorId |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b,666b,777c]| [12,'']
|7g21| [c0b5,c0b4] | [45, 87]
Run Code Online (Sandbox Code Playgroud)
试图使用groupBy和agg运营商但没有太多运气.
我有一个火花数据框df.有没有办法使用这些列的列表选择几列?
scala> df.columns
res0: Array[String] = Array("a", "b", "c", "d")
Run Code Online (Sandbox Code Playgroud)
我知道我可以做点什么df.select("b", "c").但是假设我有一个包含几个列名的列表val cols = List("b", "c"),有没有办法将它传递给df.select?df.select(cols)抛出错误.像df.select(*cols)python中的东西
如果我想创建一个StructType(即a DataFrame.schema)a case class,是否有办法在不创建的情况下创建DataFrame?我可以轻松地做到:
case class TestCase(id: Long)
val schema = Seq[TestCase]().toDF.schema
Run Code Online (Sandbox Code Playgroud)
但实际创建一个DataFrame我想要的只是模式似乎有点过分.
(如果你很好奇,问题背后的原因是我正在定义一个UserDefinedAggregateFunction,并且这样做会覆盖一些返回的方法,StructTypes并使用case类.)
我见过各种各样的人都认为这Dataframe.explode是一种有用的方法,但它会导致比原始数据帧更多的行,这根本不是我想要的.我只是想做Dataframe相当于非常简单:
rdd.map(lambda row: row + [row.my_str_col.split('-')])
Run Code Online (Sandbox Code Playgroud)
它看起来像:
col1 | my_str_col
-----+-----------
18 | 856-yygrm
201 | 777-psgdg
Run Code Online (Sandbox Code Playgroud)
并将其转换为:
col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
18 | 856-yygrm | 856 | yygrm
201 | 777-psgdg | 777 | psgdg
Run Code Online (Sandbox Code Playgroud)
我知道pyspark.sql.functions.split(),但它导致嵌套数组列而不是我想要的两个顶级列.
理想情况下,我希望这些新列也可以命名.
apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql
我正在尝试加载SVM文件并将其转换为一个,DataFrame因此我可以使用PipelineSpark 的ML模块(ML).我刚刚在Ubuntu 14.04上安装了一个新的Spark 1.5.0(没有spark-env.sh配置).
我my_script.py是:
from pyspark.mllib.util import MLUtils
from pyspark import SparkContext
sc = SparkContext("local", "Teste Original")
data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)
我正在使用: ./spark-submit my_script.py
我收到错误:
Traceback (most recent call last):
File "/home/fred-spark/spark-1.5.0-bin-hadoop2.6/pipeline_teste_original.py", line 34, in <module>
data = MLUtils.loadLibSVMFile(sc, "/home/fred-spark/svm_capture").toDF()
AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
Run Code Online (Sandbox Code Playgroud)
我无法理解的是,如果我跑:
data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)
直接在PySpark shell中,它的工作原理.
Apache Spark SQLContext和HiveContext有什么区别?
一些消息来源称,由于HiveContext是SQLContext的超集,因此开发人员应始终使用HiveContext,它具有比SQLContext更多的功能.但是每个上下文的当前API大多是相同的.
一旦我在Spark中获得了一些Row类,无论是Dataframe还是Catalyst,我想将它转换为我的代码中的case类.这可以通过匹配来完成
someRow match {case Row(a:Long,b:String,c:Double) => myCaseClass(a,b,c)}
Run Code Online (Sandbox Code Playgroud)
但是当行有大量的列,比如十几个双打,一些布尔,甚至偶尔的空位时,它变得丑陋.
我想能够将-all-cast转换为myCaseClass.是否可能,或者我已经获得了最经济的语法?
如何在连接两个数据帧时提供更多列条件.例如,我想运行以下内容:
val Lead_all = Leads.join(Utm_Master,
Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")
Run Code Online (Sandbox Code Playgroud)
我想只在这些列匹配时才加入.但是上面的语法无效,因为cols只需要一个字符串.那我怎么得到我想要的东西.
apache-spark ×10
apache-spark-sql ×10
scala ×4
pyspark ×2
rdd ×2
dataframe ×1
hive ×1
pyspark-sql ×1
python ×1