下面是将打印一列 DataSet[Row] 的 spark scala 代码:
import org.apache.spark.sql.{Dataset, Row, SparkSession}
val spark: SparkSession = SparkSession.builder()
.appName("Spark DataValidation")
.config("SPARK_MAJOR_VERSION", "2").enableHiveSupport()
.getOrCreate()
val kafkaPath:String="hdfs:///landing/APPLICATION/*"
val targetPath:String="hdfs://datacompare/3"
val pk:String = "APPLICATION_ID"
val pkValues = spark
.read
.json(kafkaPath)
.select("message.data.*")
.select(pk)
.distinct()
pkValues.show()
Run Code Online (Sandbox Code Playgroud)
关于代码的输出:
+--------------+
|APPLICATION_ID|
+--------------+
| 388|
| 447|
| 346|
| 861|
| 361|
| 557|
| 482|
| 518|
| 432|
| 422|
| 533|
| 733|
| 472|
| 457|
| 387|
| 394|
| 786|
| 458|
+--------------+
Run Code Online (Sandbox Code Playgroud)
题 …
我需要为我的数据集(在 Spark scala 中)实现分页。
如果 Spark 数据集中有 100 条记录,那么我需要分成 20 个批次,每个批次 5 个元素。
请问如何将 Spark 数据集/数据框拆分为 N 行?
--NS
有两个 json,第一个 json 有更多列,并且总是超级集。
val df1 = spark.read.json(sqoopJson)
val df2 = spark.read.json(kafkaJson)
Run Code Online (Sandbox Code Playgroud)
除了操作:
我喜欢在 df1 和 df2 上应用 except 操作,但是 df1 有 10 列而 df2 只有 8 列。如果我手动删除 df1 中的 2 列,则 except 将起作用。但是我有 50 多个表/json,并且需要对所有 50 组表/json 执行 EXCEPT。
题 :
如何仅从 DF1 中选择 DF2 (8) 列中可用的列并创建新的 df3?因此 df3 将拥有来自 df1 的有限列的数据,并且它将与 df2 列匹配。