标签: apache-spark-sql

如何在保留整行的同时获得具有最大值的单行?

我想为每个 id 获取单行,其中仅存在 Charge 列的最大值。

示例输入数据:

id  name charge 
11  hg   10    
11  mm   20
22  aa   40
22  bb   40
Run Code Online (Sandbox Code Playgroud)

我试过的代码:

df.agg(max("charge"))
Run Code Online (Sandbox Code Playgroud)

我只得到最大值,如下所示:

charge
40   
Run Code Online (Sandbox Code Playgroud)

但是,我想保留整行:

id  name charge
11  mm   20
22  aa   40
22  bb   40
Run Code Online (Sandbox Code Playgroud)

如何保留前两列?name 列对于相同的 id 可以有不同的值,因此不可能groupBy在这两个列上使用并聚合结果。

如果两行具有相同的 id 并收费,则应保留两行。

scala apache-spark apache-spark-sql spark-dataframe

0
推荐指数
1
解决办法
9456
查看次数

Spark scala:从其他数据框中选择列名

有两个 json,第一个 json 有更多列,并且总是超级集。

val df1 = spark.read.json(sqoopJson)
val df2 = spark.read.json(kafkaJson)
Run Code Online (Sandbox Code Playgroud)

除了操作:

我喜欢在 df1 和 df2 上应用 except 操作,但是 df1 有 10 列而 df2 只有 8 列。如果我手动删除 df1 中的 2 列,则 except 将起作用。但是我有 50 多个表/json,并且需要对所有 50 组表/json 执行 EXCEPT。

题 :

如何仅从 DF1 中选择 DF2 (8) 列中可用的列并创建新的 df3?因此 df3 将拥有来自 df1 的有限列的数据,并且它将与 df2 列匹配。

hadoop scala dataframe apache-spark apache-spark-sql

0
推荐指数
1
解决办法
2495
查看次数

如何以递归方式获取 Spark DataFrame 中的所有列

我想获取 DataFrame 的所有列。如果 DataFrame 具有平面结构(没有嵌套的 StructTypes),则会df.columns产生正确的结果。我也想返回所有嵌套的列名,例如

给定的

val schema = StructType(
  StructField("name", StringType) ::
  StructField("nameSecond", StringType) ::
  StructField("nameDouble", StringType) ::
  StructField("someStruct", StructType(
    StructField("insideS", StringType)::
    StructField("insideD", DoubleType)::
    Nil
  )) ::
  Nil
)
val rdd = spark.sparkContext.emptyRDD[Row]
val df = spark.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)

我想得到

Seq("name", "nameSecond", "nameDouble", "someStruct", "insideS", "insideD")
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1268
查看次数

为什么加入两个火花数据帧会失败,除非我将“.as('alias)”添加到两者中?

假设有 2 个 Spark DataFrame 我们想加入,无论出于何种原因:

val df1 = Seq(("A", 1), ("B", 2), ("C", 3)).toDF("agent", "in_count")
val df2 = Seq(("A", 2), ("C", 2), ("D", 2)).toDF("agent", "out_count")
Run Code Online (Sandbox Code Playgroud)

可以用这样的代码来完成:

val joinedDf = df1.as('d1).join(df2.as('d2), ($"d1.agent" === $"d2.agent"))

// Result:
val joinedDf.show

+-----+--------+-----+---------+
|agent|in_count|agent|out_count|
+-----+--------+-----+---------+
|    A|       1|    A|        2|
|    C|       3|    C|        2|
+-----+--------+-----+---------+
Run Code Online (Sandbox Code Playgroud)

现在,我不明白的是,为什么它只在我使用别名df1.as(d1)和时才起作用df2.as(d2)?我可以想象,如果我直截了当地写,列之间会出现名称冲突

val joinedDf = df1.join(df2, ($"df1.agent" === $"df2.agent")) // fails
Run Code Online (Sandbox Code Playgroud)

但是......我不明白为什么我不能.as(alias) 使用两者中的一个 DF

df1.as('d1).join(df2, ($"d1.agent" === $"df2.agent")).show()
Run Code Online (Sandbox Code Playgroud)

失败

org.apache.spark.sql.AnalysisException: cannot …
Run Code Online (Sandbox Code Playgroud)

scala join apache-spark apache-spark-sql spark-dataframe

0
推荐指数
1
解决办法
309
查看次数

在列表中定义的列上过滤数据框

我有一个数据框 df

+----------+----+----+----+---+---+----+---+---+-------+-------+
|      WEEK|DIM1|DIM2|  T1| T2| T3|  T1| T2| T3|T1_diff|T2_diff|
+----------+----+----+----+---+---+----+---+---+-------+-------+
|2016-04-02|  14|NULL|9874|880| 23|9879|820| 45|     -5|     60|
|2016-04-30|  14|  FR|9875| 13| 34|9785|  9| 67|     90|      4|
+----------+----+----+----+---+---+----+---+---+-------+-------+
Run Code Online (Sandbox Code Playgroud)

我想在这个数据框上做两件事:

  1. 仅选择 WEEK、DIM1、DIM2、T1_diff、T2_diff
  2. 过滤器 T1_diff 或 T2_diff > 3

我目前正在这样做 -

val selectColumns = Seq("WEEK", "DIM1", "DIM2","T1_diff","T2_diff")
df.select(selectColumns.head, selectColumns.tail: _*).filter($"T1_diff" > 3 or $"T2_diff" > 3).show()
Run Code Online (Sandbox Code Playgroud)

我有一个用例,我的 targetColumns 定义如下 -

val targetColumns = Seq("T1_diff", "T2_diff")
Run Code Online (Sandbox Code Playgroud)

我需要使用上述序列将其应用到过滤器中。这是按顺序排列的,因为可以在 targetColumns 列表中添加更多列。我试过这样的事情 -

df.filter(r => !targetColumns.map(x => col(x) > 3).isEmpty).show()
Run Code Online (Sandbox Code Playgroud)

这似乎不起作用。谁能告诉我这样做的最佳方法是什么?

scala dataframe apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1310
查看次数

遍历 Spark 数据帧的列并更新指定的值

为了遍历从 Hive 表创建的 Spark Dataframe 的列并更新所有出现的所需列值,我尝试了以下代码。

import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf

val a: DataFrame = spark.sql(s"select * from default.table_a")

    val column_names: Array[String] = a.columns

    val required_columns: Array[String] = column_names.filter(name => name.endsWith("_date")) 

    val func = udf((value: String) => { if if (value == "XXXX" || value == "WWWW" || value == "TTTT") "NULL" else value } )

    val b = {for (column: String <- required_columns) { a.withColumn(column , func(a(column))) } a}
Run Code Online (Sandbox Code Playgroud)

在 spark shell 中执行代码时,出现以下错误。

scala> val b = {for …
Run Code Online (Sandbox Code Playgroud)

hive scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
4090
查看次数

如何在转换过程中测试数据类型转换

我们有一个将数据映射到数据帧的脚本(我们使用的是 pyspark)。数据以字符串形式出现,并且对其进行了一些其他有时昂贵的操作,但作为操作的一部分(调用 withColumn),我们对其最终数据类型进行了强制转换。

我需要判断是否发生了截断,但如果发生了截断,我们不想失败。我们只想要一个数字来知道每个翻译列(大约有 300 列)中有多少行失败。

我的第一个想法是让每一列通过一个 UDF 来进行测试,输出将是一个包含值的数组,以及一个关于它是否通过数据类型检查的值。然后我会做2个选择。一个从数组中选择原始值,另一个聚合未命中。但这似乎是一个草率的解决方案。我对 pyspark/hadoop 世界还很陌生……很想知道是否有更好的(也许是标准的?)方法来做到这一点。

hadoop apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
668
查看次数

如何使用 spark-scala 从表中获取空值的计数?

我有表名“数据”,它有 5 列,每列包含一些空值。我想计算每列的空值如何为该结果编写代码!计算一列很容易,但我如何编写代码来计算表的每一列。

样本 :

+----------------+----------------+--------+---------+-------------+
| 2              |3               |4       |  5      |6            |
+----------------+----------------+--------+---------+-------------+
|null             |1               | null   |null     |null         |
|null             |null            | null   |null     |asdc         |
|null             |23              | 23     |null     |null         |
|null             |null            | null   |23       |41           |
|24               |3               | 35     |null     |null         |
|null             |null            | null   | 1       |wef          |
|null             |32              | 54     |null     |45           |
|null             |null            | null   |123      |null         |
|w411             |31              | 12     |null     |null         |
|null …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1423
查看次数

使用 groupby spark 数据帧中的条件聚合

我有一个数据框

id lat long lag_lat lag_long detector lag_interval  gpsdt  lead_gpsdt
  1  12   13    12       13        1        [1.5,3.5]  4      4.5
  1  12   13    12       13        1        null       4.5    5
  1  12   13    12       13        1        null       5      5.5
  1  12   13    12       13        1        null       5.5    6
  1  13   14    12       13        2        null       6      6.5
  1  13   14    13       14        2        null       6.5    null
  2  13   14    13       14        2        [0.5,1.5]  2.5    3.5  
  2  13   14    13       14        2        null …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
3645
查看次数

Spark 中的累积乘积?

我尝试在 Spark scala 中实现一个累积产品,但我真的不知道如何实现。我有以下数据框:

Input data:
+--+--+--------+----+
|A |B | date   | val|
+--+--+--------+----+
|rr|gg|20171103| 2  |
|hh|jj|20171103| 3  |
|rr|gg|20171104| 4  |
|hh|jj|20171104| 5  |
|rr|gg|20171105| 6  |
|hh|jj|20171105| 7  |
+-------+------+----+
Run Code Online (Sandbox Code Playgroud)

我想要以下输出

Output data:
+--+--+--------+-----+
|A |B | date   | val |
+--+--+--------+-----+
|rr|gg|20171105| 48  | // 2 * 4 * 6
|hh|jj|20171105| 105 | // 3 * 5 * 7
+-------+------+-----+
Run Code Online (Sandbox Code Playgroud)

如果您对如何做有任何想法,那将非常有帮助:)

非常感谢

scala aggregation apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1551
查看次数