标签: apache-spark-sql

如何计算作为列值的表达式?

我有一个包含数百万行的大数据框,如下所示:

A    B    C    Eqn
12   3    4    A+B
32   8    9    B*C
56   12   2    A+B*C
Run Code Online (Sandbox Code Playgroud)

如何计算列中的表达式Eqn

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
2269
查看次数

如何根据 PySpark 中其他列中的计算创建新列

我有一个以下数据框:

+-----------+----------+----------+
|   some_id | one_col  | other_col|
+-----------+----------+----------+
|       xx1 |        11|       177|         
|       xx2 |      1613|      2000|    
|       xx4 |         0|     12473|      
+-----------+----------+----------+
Run Code Online (Sandbox Code Playgroud)

我需要添加一个新列,该列基于对第一列和第二列进行的一些计算,即,例如,对于 col1_value=1 和 col2_value=10 需要生成 col1 包含在 col2 中的百分比,因此 col3_value = (1/10)*100=10%:

+-----------+----------+----------+--------------+
|   some_id | one_col  | other_col|  percentage  |
+-----------+----------+----------+--------------+
|       xx1 |        11|       177|     6.2      |  
|       xx3 |         1|       10 |      10      |     
|       xx2 |      1613|      2000|     80.6     |
|       xx4 |         0|     12473|      0       |
+-----------+----------+----------+--------------+
Run Code Online (Sandbox Code Playgroud)

我知道我需要为此使用 udf,但是如何根据结果直接添加新的列值?

一些伪代码: …

python apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
9633
查看次数

Apache Spark:无法将分组数据保存为 CSV

我想做一件简单的事情。我想将所有事件计数放入 2 分钟的时间戳中。

效果很好。

df = df.groupBy(window(df["time_value"], "2 minutes")).count()

df.show()

+--------------------+-----+
| window|count|
+--------------------+-----+
|[2018-04-10 15:00...| 770|
|[2018-04-10 00:42...| 100|
|[2018-04-10 04:14...| 54|
|[2018-04-06 15:54...| 36|
|[2018-04-10 04:46...| 304|
|[2018-04-10 20:36...| 347|
|[2018-04-10 03:26...| 41|
|[2018-04-10 21:10...| 85|
|[2018-04-10 11:44...| 426|
|[2018-04-10 12:32...| 754|
|[2018-04-10 00:28...| 61|
|[2018-04-10 05:36...| 478|
|[2018-04-06 07:04...| 18|
|[2018-04-10 22:14...| 195|
|[2018-04-10 23:40...| 175|
|[2018-04-10 00:20...| 229|
|[2018-04-10 03:10...| 209|
|[2018-04-10 01:28...| 67|
|[2018-04-09 18:52...| 9|
|[2018-04-10 19:06...| 3548|
+--------------------+-----+
only showing top 20 rows …
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
3741
查看次数

使用 df.cache() 后是否必须使用 df.unpersist() ?

df.unpersist()使用后是否必须要df.cache()释放缓存才能使用?如果我将 DataFrame 存储在缓存中而不取消持久化,那么代码运行得非常快。然而,当我使用时,它需要更长的时间df.unpersist()

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
1万
查看次数

Pyspark 日期 yyyy-mmm-dd 转换

有一个火花数据框。其中一个列的日期填充格式类似于 2018-Jan-12

我需要将此结构更改为 20180112

如何才能实现这一目标

apache-spark-sql pyspark

1
推荐指数
1
解决办法
2万
查看次数

在pySpark中读取本地csv文件(2.3)

我正在使用 pySpark 2.3,尝试读取如下所示的 csv 文件:

0,0.000476517230863068,0.0008178378961061477
1,0.0008506156837329876,0.0008467260987257776
Run Code Online (Sandbox Code Playgroud)

但它不起作用:

from pyspark import sql, SparkConf, SparkContext
print (sc.applicationId)
>> <property at 0x7f47583a5548>
data_rdd = spark.textFile(name=tsv_data_path).filter(x.split(",")[0] != 1)
Run Code Online (Sandbox Code Playgroud)

我收到一个错误:

AttributeError: 'SparkSession' object has no attribute 'textFile'
Run Code Online (Sandbox Code Playgroud)

知道我应该如何在 pySpark 2.3 中阅读它吗?

apache-spark apache-spark-sql pyspark apache-spark-mllib

1
推荐指数
1
解决办法
2万
查看次数

使用 Spark Catalyst 逻辑计划修改查询

是否可以使用扩展点在 DataFrame API/SQL 中添加/替换现有列表达式。

例如:假设我们注入解析规则,该规则可以检查计划中的项目节点,并在检查“名称”列时,将其替换为 upper(name)。

使用扩展点可以实现这样的事情吗?我发现的例子大多很简单,它们没有按照我需要的方式操作输入表达式。

请告诉我这是否可能。

apache-spark apache-spark-sql

1
推荐指数
1
解决办法
1571
查看次数

Spark Explode 空列返回空行

我是 Spark 编程新手。我正在尝试用空行爆炸 DataFrame 的列。我认为爆炸函数简单来说,为数组中的每个元素创建额外的行。但结果不同。

我无法理解分解的 DataFrame 背后的逻辑。有人可以解释一下下面的例子吗?我想了解这个结果的根本原理/原因。为什么空数组在数据框中被视为空?

//inputDataFrame
+---+------+----------+
|age|  name|occupation|
+---+------+----------+
| []|Harish| developer|
+---+------+----------+

df.withColumn("age",explode(col("age")))

//DataFrame with age column exploded
+---+----+----------+
|age|name|occupation|
+---+----+----------+
+---+----+----------+

// expected DataFrame
    +---+------+----------+     +----+------+----------+
    |age|  name|occupation|     |age |  name|occupation|
    +---+------+----------+ (or)+----+------+----------+
    |   |Harish| developer|     |null|Harish| developer|
    +---+------+----------+     +----+------+----------+
Run Code Online (Sandbox Code Playgroud)

EDIT1:根据 Chandan,我发现这个堆栈问题Spark sql how toexplode without Losing null Values并且可以理解可用于 Spark2 的爆炸 API。但我找不到关于为什么删除该行的正确解释。

apache-spark apache-spark-sql

1
推荐指数
1
解决办法
4123
查看次数

在 Spark 中读取最后一列作为值数组的 CSV(值位于括号内并以逗号分隔)

我有一个 CSV 文件,其中最后一列位于括号内,并且值以逗号分隔。最后一列中值的数量是可变的。当我将它们读为带有一些列名称的 Dataframe 时,如下所示,我得到了Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match. 我的 CSV 文件如下所示

a1,b1,true,2017-05-16T07:00:41.0000000,2.5,(c1,d1,e1)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2,d2,e2,f2,g2)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2,d2)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2,d2,e2)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2,d2,e2,k2,f2)
Run Code Online (Sandbox Code Playgroud)

我最终想要的是这样的:

root
 |-- MId: string (nullable = true)
 |-- PId: string (nullable = true)
 |-- IsTeacher: boolean(nullable = true)
 |-- STime: datetype(nullable = true)
 |-- TotalMinutes: double(nullable = true)
 |-- SomeArrayHeader: array<string>(nullable = true)
Run Code Online (Sandbox Code Playgroud)

到目前为止我已经编写了以下代码:

val infoDF =
  sqlContext.read.format("csv")
    .option("header", "false")
    .load(inputPath)
    .toDF(
      "MId",
      "PId",
      "IsTeacher",
      "STime",
      "TotalMinutes",
      "SomeArrayHeader")
Run Code Online (Sandbox Code Playgroud)

我想在不给出列名的情况下阅读它们,然后将第五列之后的列转换为数组类型。但后来我遇到了括号的问题。有没有一种方法可以在阅读并告知括号内的字段实际上是数组类型的一个字段时执行此操作。

scala apache-spark apache-spark-sql spark-csv

1
推荐指数
1
解决办法
4774
查看次数

动态传递查询字符串以在 PySpark Dataframe 方法 selectExpr() 中选择列

我按如下方式动态生成查询字符串并将其传递给 selectExpr()。

queryString=''''category_id as cat_id','category_department_id as cat_dpt_id','category_name as cat_name''''
df.selectExpr(queryString)
Run Code Online (Sandbox Code Playgroud)

根据文件

selectExpr(*expr) :投影一组 SQL 表达式并返回一个新的 DataFrame。这是 select() 的一个变体,它接受 SQL 表达式。

问题在于变量“queryString”被视为单个字符串而不是三个单独的列(正确的是)。以下是错误:

:org.apache.spark.sql.catalyst.parser.ParseException:…………

== SQL ==

“category_id 作为 cat_id”、“category_department_id 作为 cat_dpt_id”、“category_name 作为 cat_name”

------------------------^^^

有什么方法可以将动态生成的“queryString”作为 selectExpr() 的参数传递。

apache-spark-sql pyspark

1
推荐指数
1
解决办法
3610
查看次数