标签: spark-dataframe

Apache spark处理case语句

我正在处理将SQL代码转换为PySpark代码并遇到一些SQL语句.我不知道如何处理pyspark中的案例陈述?我打算创建一个RDD然后使用rdd.map然后做一些逻辑检查.这是正确的方法吗?请帮忙!

基本上我需要遍历RDD或DF中的每一行,并根据我需要编辑其中一个列值的逻辑.

     case  
               when (e."a" Like 'a%' Or e."b" Like 'b%') 
                And e."aa"='BW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitA'

               when (e."a" Like 'b%' Or e."b" Like 'a%') 
                And e."aa"='AW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitB'

else

'CallitC'
Run Code Online (Sandbox Code Playgroud)

apache-spark rdd pyspark spark-dataframe pyspark-sql

13
推荐指数
2
解决办法
3万
查看次数

pySpark Dataframe上的多个聚合标准

我有一个pySpark数据框,如下所示:

+-------------+----------+
|          sku|      date|
+-------------+----------+
|MLA-603526656|02/09/2016|
|MLA-603526656|01/09/2016|
|MLA-604172009|02/10/2016|
|MLA-605470584|02/09/2016|
|MLA-605502281|02/10/2016|
|MLA-605502281|02/09/2016|
+-------------+----------+
Run Code Online (Sandbox Code Playgroud)

我想通过sku分组,然后计算最小和最大日期.如果我这样做:

df_testing.groupBy('sku') \
    .agg({'date': 'min', 'date':'max'}) \
    .limit(10) \
    .show()
Run Code Online (Sandbox Code Playgroud)

行为与Pandas相同,我只获取skumax(date)列.在Pandas我通常会做以下事情来得到我想要的结果:

df_testing.groupBy('sku') \
    .agg({'day': ['min','max']}) \
    .limit(10) \
    .show()
Run Code Online (Sandbox Code Playgroud)

但是在pySpark上这不起作用,我得到一个java.util.ArrayList cannot be cast to java.lang.String错误.谁能指点我正确的语法?

谢谢.

apache-spark pyspark spark-dataframe

13
推荐指数
1
解决办法
1万
查看次数

在通过JDBC从pyspark数据帧插入外部数据库表时,打开DUPLICATE KEY UPDATE

好吧,我正在使用PySpark并且我有一个Spark数据帧,我使用它将数据插入到mysql表中.

url = "jdbc:mysql://hostname/myDB?user=xyz&password=pwd"

df.write.jdbc(url=url, table="myTable", mode="append")

我想通过列值和特定数字的总和更新列值(不在主键中).

我尝试过不同的模式(追加,覆盖)DataFrameWriter.jdbc()函数.

我的问题是我们如何ON DUPLICATE KEY UPDATE在mysql中更新列值,同时将pyspark数据帧数据插入表中.

apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql

12
推荐指数
1
解决办法
1412
查看次数

如何选择以通用标签开头的所有列

我在Spark 1.6中有一个数据框,并希望从中选择一些列.列名称如下:

colA, colB, colC, colD, colE, colF-0, colF-1, colF-2
Run Code Online (Sandbox Code Playgroud)

我知道我可以这样做来选择特定的列:

df.select("colA", "colB", "colE")
Run Code Online (Sandbox Code Playgroud)

但如何选择,一次说"colA","colB"和所有colF-*列?有没有像熊猫一样的方式?

scala apache-spark spark-dataframe

12
推荐指数
2
解决办法
1万
查看次数

如何拆分列?

我想看看我是否可以在spark数据帧中拆分列.像这样,

Select employee, split(department,"_") from Employee
Run Code Online (Sandbox Code Playgroud)

apache-spark-sql spark-dataframe

12
推荐指数
1
解决办法
2万
查看次数

为少数列创建具有空值的DataFrame

我正在尝试创建一个DataFrame使用RDD.

首先,我创建一个RDD使用下面的代码 -

val account = sc.parallelize(Seq(
                                 (1, null, 2,"F"), 
                                 (2, 2, 4, "F"),
                                 (3, 3, 6, "N"),
                                 (4,null,8,"F")))
Run Code Online (Sandbox Code Playgroud)

它工作正常 -

account:org.apache.spark.rdd.RDD [(Int,Any,Int,String)] = ParallelCollectionRDD [0]并行化:27

但是,当尝试创建DataFrameRDD使用下面的代码

account.toDF("ACCT_ID", "M_CD", "C_CD","IND")
Run Code Online (Sandbox Code Playgroud)

我收到了以下错误

java.lang.UnsupportedOperationException:不支持类型为Any的架构

我分析说,每当我把null值放进去的时候,Seq只有我得到了错误.

有没有办法添加空值?

scala apache-spark spark-dataframe apache-spark-dataset

12
推荐指数
2
解决办法
6986
查看次数

java.lang.RuntimeException:java.lang.String不是bigint或int模式的有效外部类型

我正在从文本文件中读取数据框架的模式.该文件看起来像

id,1,bigint
price,2,bigint
sqft,3,bigint
zip_id,4,int
name,5,string
Run Code Online (Sandbox Code Playgroud)

我将解析后的数据类型映射到Spark Sql数据类型.创建数据框的代码是 -

var schemaSt = new ListBuffer[(String,String)]()
// read schema from file
for (line <- Source.fromFile("meta.txt").getLines()) {
  val word = line.split(",")
  schemaSt += ((word(0),word(2)))
}

// map datatypes
val types = Map("int" -> IntegerType, "bigint" -> LongType)
      .withDefault(_ => StringType)
val schemaChanged = schemaSt.map(x => (x._1,types(x._2))

// read data source
val lines = spark.sparkContext.textFile("data source path")

val fields = schemaChanged.map(x => StructField(x._1, x._2, nullable = true)).toList

val schema = StructType(fields)

val rowRDD = …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe

12
推荐指数
2
解决办法
2万
查看次数

火花堆内存配置和钨

我认为通过项目Tungesten的集成,spark会自动使用堆内存.

什么是spark.memory.offheap.size和spark.memory.offheap.enabled?我是否需要手动指定Tungsten的关闭堆内存量?

apache-spark apache-spark-sql spark-dataframe apache-spark-2.0 off-heap

12
推荐指数
1
解决办法
6787
查看次数

pyspark:isin vs join

在给定的值列表中过滤pyspark中的数据帧的一般最佳实践是什么?特别:

根据给定值列表的大小,那么关于运行时何时最好使用isinvs inner joinvs broadcast

这个问题是Pig中以下问题的火花类比:

Pig:按加载列表进行高效过滤

附加背景:

Pyspark功能齐全

apache-spark pyspark spark-dataframe

12
推荐指数
2
解决办法
3055
查看次数

为什么dataset.count导致shuffle!(火花2.2)

这是我的数据帧:

在此输入图像描述

底层RDD有2个分区

在此输入图像描述 在此输入图像描述

当我做df.count时,产生的DAG是 在此输入图像描述

当我执行df.rdd.count时,生成的DAG是:

在此输入图像描述

:Count是spark中的一个动作,官方定义是'返回DataFrame中的行数'.现在,当我对数据帧执行计数时,为什么会发生洗牌?此外,当我在底层RDD上做同样的事情时,不会发生随机播放.

对我来说无论如何都会发生洗牌是没有意义的.我试图通过这里的计数源代码来解决spark github 但它对我来说没有任何意义."groupby"是否被提供给行动的罪​​魁祸首?

PS.df.coalesce(1).count不会导致任何混乱

scala apache-spark rdd spark-dataframe

12
推荐指数
2
解决办法
2548
查看次数