如何将spark数据帧中的所有列名称转换为Seq变量.
输入数据和架构
val dataset1 = Seq(("66", "a", "4"), ("67", "a", "0"), ("70", "b", "4"), ("71", "d", "4")).toDF("KEY1", "KEY2", "ID")
dataset1.printSchema()
root
|-- KEY1: string (nullable = true)
|-- KEY2: string (nullable = true)
|-- ID: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我需要使用scala编程将所有列名存储在变量中.我试过如下,但它不起作用.
val selectColumns = dataset1.schema.fields.toSeq
selectColumns: Seq[org.apache.spark.sql.types.StructField] = WrappedArray(StructField(KEY1,StringType,true),StructField(KEY2,StringType,true),StructField(ID,StringType,true))
Run Code Online (Sandbox Code Playgroud)
预期产量:
val selectColumns = Seq(
col("KEY1"),
col("KEY2"),
col("ID")
)
selectColumns: Seq[org.apache.spark.sql.Column] = List(KEY1, KEY2, ID)
Run Code Online (Sandbox Code Playgroud) 我需要在Spark中实现下面的SQL逻辑 DataFrame
SELECT KEY,
CASE WHEN tc in ('a','b') THEN 'Y'
WHEN tc in ('a') AND amt > 0 THEN 'N'
ELSE NULL END REASON,
FROM dataset1;
Run Code Online (Sandbox Code Playgroud)
我的输入DataFrame如下:
val dataset1 = Seq((66, "a", "4"), (67, "a", "0"), (70, "b", "4"), (71, "d", "4")).toDF("KEY", "tc", "amt")
dataset1.show()
Run Code Online (Sandbox Code Playgroud)
+---+---+---+
|KEY| tc|amt|
+---+---+---+
| 66| a| 4|
| 67| a| 0|
| 70| b| 4|
| 71| d| 4|
+---+---+---+
Run Code Online (Sandbox Code Playgroud)
我在语句时实现了嵌套大小写:
dataset1.withColumn("REASON", when(col("tc").isin("a", "b"), "Y")
.otherwise(when(col("tc").equalTo("a") && col("amt").geq(0), "N")
.otherwise(null))).show()
Run Code Online (Sandbox Code Playgroud)
+---+---+---+------+ …Run Code Online (Sandbox Code Playgroud) 我正在使用Window函数在Spark中实现累积和.但是在应用窗口分区功能时不保持记录输入的顺序
输入数据:
val base = List(List("10", "MILLER", "1300", "2017-11-03"), List("10", "Clark", "2450", "2017-12-9"), List("10", "King", "5000", "2018-01-28"),
List("30", "James", "950", "2017-10-18"), List("30", "Martin", "1250", "2017-11-21"), List("30", "Ward", "1250", "2018-02-05"))
.map(row => (row(0), row(1), row(2), row(3)))
val DS1 = base.toDF("dept_no", "emp_name", "sal", "date")
DS1.show()
Run Code Online (Sandbox Code Playgroud)
+-------+--------+----+----------+
|dept_no|emp_name| sal| date|
+-------+--------+----+----------+
| 10| MILLER|1300|2017-11-03|
| 10| Clark|2450| 2017-12-9|
| 10| King|5000|2018-01-28|
| 30| James| 950|2017-10-18|
| 30| Martin|1250|2017-11-21|
| 30| Ward|1250|2018-02-05|
+-------+--------+----+----------+
Run Code Online (Sandbox Code Playgroud)
预期产出:
+-------+--------+----+----------+-----------+
|dept_no|emp_name| sal| date|Dept_CumSal|
+-------+--------+----+----------+-----------+
| 10| MILLER|1300|2017-11-03| 1300.0|
| …Run Code Online (Sandbox Code Playgroud) 我需要知道从输入目录流式传输的输入文件的文件名。
下面是scala编程中的spark FileStreaming代码
object FileStreamExample {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.master("local").getOrCreate()
val input_dir = "src/main/resources/stream_input"
val ck = "src/main/resources/chkpoint_dir"
//create stream from folder
val fileStreamDf = sparkSession.readStream.csv(input_dir)
def fileNames() = fileStreamDf.inputFiles.foreach(println(_))
println("Streaming Started...\n")
//fileNames() //even here it is throwing the same exception
val query = fileStreamDf.writeStream
.format("console")
.outputMode(OutputMode.Append())
.option("checkpointLocation", ck)
.start()
fileNames();
query.awaitTermination()
}}
Run Code Online (Sandbox Code Playgroud)
但在流式传输时面临以下异常
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/main/resources/stream_input]
Run Code Online (Sandbox Code Playgroud) 我需要更新delta数据的数据帧的行号列.我已经实现了基本加载的行号,如下所示:
输入数据:
val base = List(List("001", "a", "abc"), List("001", "a", "123"),List("003", "c", "456") ,List("002", "b", "dfr"), List("003", "c", "ytr"))
.map(row => (row(0), row(1), row(2)))
val DS1 = base.toDF("KEY1", "KEY2" ,"VAL")
DS1.show()
+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001| a|abc|
| 001| a|123|
| 003| c|456|
| 002| b|dfr|
| 003| c|ytr|
+----+----+---+
Run Code Online (Sandbox Code Playgroud)
现在我使用窗口函数添加了行号,如下所示:
val baseDF = DS1.select(col("KEY1"), col("KEY2"), col("VAL") ,row_number().over(Window.partitionBy(col("KEY1"), col("KEY2")).orderBy(col("KEY1"), col("KEY2").asc)).alias("Row_Num"))
baseDF.show()
+----+----+---+-------+
|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a |abc|1 |
|001 |a |123|2 |
|002 |b |dfr|1 |
|003 |c |456|1 | …Run Code Online (Sandbox Code Playgroud) 我需要在加入两个数据帧时在 spark 中实现 NVL 功能。
输入数据帧:
ds1.show()
---------------
|key | Code |
---------------
|2 | DST |
|3 | CPT |
|null | DTS |
|5 | KTP |
---------------
ds2.show()
------------------
|key | PremAmt |
------------------
|2 | 300 |
|-1 | -99 |
|5 | 567 |
------------------
Run Code Online (Sandbox Code Playgroud)
需要实现 "LEFT JOIN NVL(DS1.key, -1) = DS2.key" 。所以我是这样写的,但是缺少 NVL 或 Coalesce 函数。所以它返回了错误的值。
如何在火花数据帧中加入“NVL”?
// nvl function is missing, so wrong output
ds1.join(ds1,Seq("key"),"left_outer")
-------------------------
|key | Code |PremAmt |
-------------------------
|2 …Run Code Online (Sandbox Code Playgroud) 我需要创建一个具有 11 个参数的 Spark UDF。有什么办法可以实现吗?
我知道我们可以创建一个最多有 10 个参数的 UDF
下面是 10 个参数的代码。有用
val testFunc1 = (one: String, two: String, three: String, four: String,
five: String, six: String, seven: String, eight: String, nine: String, ten: String) => {
if (isEmpty(four)) false
else four match {
case "RDIS" => three == "ST"
case "TTSC" => nine == "UT" && eight == "RR"
case _ => false
}
}
import org.apache.spark.sql.functions.udf
udf(testFunc1)
Run Code Online (Sandbox Code Playgroud)
下面是 11 个参数的代码。面临“未指定值参数:dataType”问题
val testFunc2 = (one: String, two: String, three: …Run Code Online (Sandbox Code Playgroud) 我需要解析一个字符串值并加载到 python 字典中
输入:
attributes = "LIFETIME=203421,ID=acr44,SCOPE=[open,basic.operation:read,common.operation:write],USER=b611-410e,CLAIMS_"
Run Code Online (Sandbox Code Playgroud)
预期输出:
attributesDictionary = { "LIFETIME" : "203421",
"ID" : "acr44",
"SCOPE" : "[open,basic.operation:read,common.operation:write]",
"USER" : "b611-410e",
"CLAIMS_" : None
}
attributesDictionary["ID"]
>>> 'acr44'
attributesDictionary["SCOPE"]
>>> '[open,basic.operation:read,common.operation:write]'
Run Code Online (Sandbox Code Playgroud)
我是 python 编程的新手。我们怎样才能做到这一点?
我正在编写Spark Scala UDF并面临"java.lang.UnsupportedOperationException:不支持类型为Any的架构"
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
val aBP = udf((bG: String, pS: String, bP: String, iOne: String, iTwo: String) => {
if (bG != "I") {"NA"}
else if (pS == "D")
{if (iTwo != null) iOne else "NA"}
else if (pS == "U")
{if (bP != null) bP else "NA"}
})
Run Code Online (Sandbox Code Playgroud)
这是抛出错误"java.lang.UnsupportedOperationException:不支持类型为Any的模式"
我有时间戳输入,基于某些条件,我需要使用 Scala 编程减去 1 秒或减去 3 个月
输入:
val date :String = "2017-10-31T23:59:59.000"
Run Code Online (Sandbox Code Playgroud)
输出:
减 1 秒
val lessOneSec = "2017-10-31T23:59:58.000"
Run Code Online (Sandbox Code Playgroud)
减 3 个月
val less3Mon = "2017-07-31T23:59:58.000"
Run Code Online (Sandbox Code Playgroud)
如何将字符串值转换为时间戳并执行 Scala 编程中的减号等操作?
apache-spark ×9
scala ×8
sql ×2
dataframe ×1
filestream ×1
python ×1
python-2.7 ×1
python-3.x ×1