小编RaA*_*aAm的帖子

获取Spark数据帧列列表

如何将spark数据帧中的所有列名称转换为Seq变量.

输入数据和架构

val dataset1 = Seq(("66", "a", "4"), ("67", "a", "0"), ("70", "b", "4"), ("71", "d", "4")).toDF("KEY1", "KEY2", "ID")

dataset1.printSchema()
root
|-- KEY1: string (nullable = true)
|-- KEY2: string (nullable = true)
|-- ID: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我需要使用scala编程将所有列名存储在变量中.我试过如下,但它不起作用.

val selectColumns = dataset1.schema.fields.toSeq

selectColumns: Seq[org.apache.spark.sql.types.StructField] = WrappedArray(StructField(KEY1,StringType,true),StructField(KEY2,StringType,true),StructField(ID,StringType,true))
Run Code Online (Sandbox Code Playgroud)

预期产量:

val selectColumns = Seq(
  col("KEY1"),
  col("KEY2"),
  col("ID")
)

selectColumns: Seq[org.apache.spark.sql.Column] = List(KEY1, KEY2, ID)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe

11
推荐指数
3
解决办法
2万
查看次数

声明时Spark星际数据帧嵌套情况

我需要在Spark中实现下面的SQL逻辑 DataFrame

SELECT KEY,
    CASE WHEN tc in ('a','b') THEN 'Y'
         WHEN tc in ('a') AND amt > 0 THEN 'N'
         ELSE NULL END REASON,
FROM dataset1;
Run Code Online (Sandbox Code Playgroud)

我的输入DataFrame如下:

val dataset1 = Seq((66, "a", "4"), (67, "a", "0"), (70, "b", "4"), (71, "d", "4")).toDF("KEY", "tc", "amt")

dataset1.show()
Run Code Online (Sandbox Code Playgroud)
+---+---+---+
|KEY| tc|amt|
+---+---+---+
| 66|  a|  4|
| 67|  a|  0|
| 70|  b|  4|
| 71|  d|  4|
+---+---+---+
Run Code Online (Sandbox Code Playgroud)

我在语句时实现了嵌套大小写:

dataset1.withColumn("REASON", when(col("tc").isin("a", "b"), "Y")
  .otherwise(when(col("tc").equalTo("a") && col("amt").geq(0), "N")
    .otherwise(null))).show()
Run Code Online (Sandbox Code Playgroud)
+---+---+---+------+ …
Run Code Online (Sandbox Code Playgroud)

sql dataframe apache-spark apache-spark-sql

9
推荐指数
1
解决办法
2万
查看次数

Spark Scala:使用分析函数获得累计和(运行总计)

我正在使用Window函数在Spark中实现累积和.但是在应用窗口分区功能时不保持记录输入的顺序

输入数据:

val base = List(List("10", "MILLER", "1300", "2017-11-03"), List("10", "Clark", "2450", "2017-12-9"), List("10", "King", "5000", "2018-01-28"),
  List("30", "James", "950", "2017-10-18"), List("30", "Martin", "1250", "2017-11-21"), List("30", "Ward", "1250", "2018-02-05"))
  .map(row => (row(0), row(1), row(2), row(3)))

val DS1 = base.toDF("dept_no", "emp_name", "sal", "date")
DS1.show()
Run Code Online (Sandbox Code Playgroud)
+-------+--------+----+----------+
|dept_no|emp_name| sal|      date|
+-------+--------+----+----------+
|     10|  MILLER|1300|2017-11-03|
|     10|   Clark|2450| 2017-12-9|
|     10|    King|5000|2018-01-28|
|     30|   James| 950|2017-10-18|
|     30|  Martin|1250|2017-11-21|
|     30|    Ward|1250|2018-02-05|
+-------+--------+----+----------+
Run Code Online (Sandbox Code Playgroud)

预期产出:

+-------+--------+----+----------+-----------+
|dept_no|emp_name| sal|      date|Dept_CumSal|
+-------+--------+----+----------+-----------+
|     10|  MILLER|1300|2017-11-03|     1300.0|
| …
Run Code Online (Sandbox Code Playgroud)

sql scala window-functions apache-spark apache-spark-sql

7
推荐指数
2
解决办法
6493
查看次数

Spark 文件流获取文件名

我需要知道从输入目录流式传输的输入文件的文件名。

下面是scala编程中的spark FileStreaming代码

object FileStreamExample {
  def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession.builder.master("local").getOrCreate()

    val input_dir = "src/main/resources/stream_input"
    val ck = "src/main/resources/chkpoint_dir"

    //create stream from folder
    val fileStreamDf = sparkSession.readStream.csv(input_dir)

    def fileNames() = fileStreamDf.inputFiles.foreach(println(_))

    println("Streaming Started...\n")
    //fileNames() //even here it is throwing the same exception
    val query = fileStreamDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("checkpointLocation", ck)
      .start()

    fileNames();

    query.awaitTermination()

  }}
Run Code Online (Sandbox Code Playgroud)

但在流式传输时面临以下异常

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/main/resources/stream_input]
Run Code Online (Sandbox Code Playgroud)

scala filestream apache-spark spark-streaming

5
推荐指数
1
解决办法
2974
查看次数

为Delta Data更新Spark Dataframe的窗口函数row_number列

我需要更新delta数据的数据帧的行号列.我已经实现了基本加载的行号,如下所示:

输入数据:

val base = List(List("001", "a", "abc"), List("001", "a", "123"),List("003", "c", "456") ,List("002", "b", "dfr"), List("003", "c", "ytr"))
  .map(row => (row(0), row(1), row(2)))

val DS1 = base.toDF("KEY1", "KEY2" ,"VAL")

DS1.show()

+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001|   a|abc|
| 001|   a|123|
| 003|   c|456|
| 002|   b|dfr|
| 003|   c|ytr|
+----+----+---+
Run Code Online (Sandbox Code Playgroud)

现在我使用窗口函数添加了行号,如下所示:

val baseDF =  DS1.select(col("KEY1"), col("KEY2"), col("VAL") ,row_number().over(Window.partitionBy(col("KEY1"), col("KEY2")).orderBy(col("KEY1"), col("KEY2").asc)).alias("Row_Num"))
baseDF.show()

+----+----+---+-------+
|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a   |abc|1      |
|001 |a   |123|2      |
|002 |b   |dfr|1      |
|003 |c   |456|1      | …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

4
推荐指数
1
解决办法
1万
查看次数

Spark Dataframe - 在加入时实施 Oracle NVL 功能

我需要在加入两个数据帧时在 spark 中实现 NVL 功能。

输入数据帧:

ds1.show()
---------------
|key  | Code  |
---------------
|2    | DST   |
|3    | CPT   |
|null | DTS   |
|5    | KTP   |
---------------

ds2.show()
------------------
|key  | PremAmt |
------------------
|2     | 300   |
|-1    | -99   |
|5     | 567   |
------------------
Run Code Online (Sandbox Code Playgroud)

需要实现 "LEFT JOIN NVL(DS1.key, -1) = DS2.key" 。所以我是这样写的,但是缺少 NVL 或 Coalesce 函数。所以它返回了错误的值。

如何在火花数据帧中加入“NVL”?

// nvl function is missing, so wrong output
ds1.join(ds1,Seq("key"),"left_outer")

-------------------------
|key  | Code  |PremAmt  |
-------------------------
|2 …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe

3
推荐指数
2
解决办法
2万
查看次数

Spark Scala UDF 参数限制为 10

我需要创建一个具有 11 个参数的 Spark UDF。有什么办法可以实现吗?
我知道我们可以创建一个最多有 10 个参数的 UDF

下面是 10 个参数的代码。有用

val testFunc1 = (one: String, two: String, three: String, four: String,
                 five: String, six: String, seven: String, eight: String, nine: String, ten: String) => {
    if (isEmpty(four)) false
    else four match {
        case "RDIS" => three == "ST"
        case "TTSC" => nine == "UT" && eight == "RR"
        case _ => false
    }
}
import org.apache.spark.sql.functions.udf    
udf(testFunc1)
Run Code Online (Sandbox Code Playgroud)

下面是 11 个参数的代码。面临“未指定值参数:dataType”问题

val testFunc2 = (one: String, two: String, three: …
Run Code Online (Sandbox Code Playgroud)

scala user-defined-functions apache-spark apache-spark-sql

2
推荐指数
1
解决办法
1992
查看次数

Python解析字符串值并加载到字典中

我需要解析一个字符串值并加载到 python 字典中

输入:

attributes = "LIFETIME=203421,ID=acr44,SCOPE=[open,basic.operation:read,common.operation:write],USER=b611-410e,CLAIMS_"
Run Code Online (Sandbox Code Playgroud)

预期输出:

attributesDictionary = { "LIFETIME" : "203421",
                         "ID" : "acr44",
                         "SCOPE" : "[open,basic.operation:read,common.operation:write]",
                         "USER" : "b611-410e",
                         "CLAIMS_" : None
                         }

attributesDictionary["ID"]
>>> 'acr44'

attributesDictionary["SCOPE"]
>>> '[open,basic.operation:read,common.operation:write]'
Run Code Online (Sandbox Code Playgroud)

我是 python 编程的新手。我们怎样才能做到这一点?

python python-2.7 python-3.x

1
推荐指数
1
解决办法
53
查看次数

Spark Dataframe UDF - 不支持类型为Any的架构

我正在编写Spark Scala UDF并面临"java.lang.UnsupportedOperationException:不支持类型为Any的架构"

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

val aBP = udf((bG: String, pS: String, bP: String, iOne: String, iTwo: String) => {
  if (bG != "I") {"NA"}
  else if (pS == "D")
    {if (iTwo != null) iOne else "NA"}
  else if (pS == "U")
    {if (bP != null) bP else "NA"}
})
Run Code Online (Sandbox Code Playgroud)

这是抛出错误"java.lang.UnsupportedOperationException:不支持类型为Any的模式"

scala user-defined-functions apache-spark spark-dataframe

0
推荐指数
1
解决办法
4066
查看次数

对时间戳值的 Scala 操作

我有时间戳输入,基于某些条件,我需要使用 Scala 编程减去 1 秒或减去 3 个月

输入:

val date :String = "2017-10-31T23:59:59.000"
Run Code Online (Sandbox Code Playgroud)

输出:

减 1 秒

val lessOneSec = "2017-10-31T23:59:58.000"
Run Code Online (Sandbox Code Playgroud)

减 3 个月

val less3Mon   = "2017-07-31T23:59:58.000"
Run Code Online (Sandbox Code Playgroud)

如何将字符串值转换为时间戳并执行 Scala 编程中的减号等操作?

scala apache-spark spark-dataframe

0
推荐指数
1
解决办法
1607
查看次数