Nat*_*hon 0 scala bigdata hadoop-streaming apache-spark hadoop2
我想比较两个文件,如果不匹配的额外记录加载到具有不匹配记录的另一个文件.比较文件和记录计数中的每个字段.
假设您有两个文件:
scala> val a = spark.read.option("header", "true").csv("a.csv").alias("a"); a.show
+---+-----+
|key|value|
+---+-----+
| a| b|
| b| c|
+---+-----+
a: org.apache.spark.sql.DataFrame = [key: string, value: string]
scala> val b = spark.read.option("header", "true").csv("b.csv").alias("b"); b.show
+---+-----+
|key|value|
+---+-----+
| b| c|
| c| d|
+---+-----+
b: org.apache.spark.sql.DataFrame = [key: string, value: string]
Run Code Online (Sandbox Code Playgroud)
目前还不清楚您正在寻找哪种不匹配的记录,但很容易通过以下任何定义找到它们join:
scala> a.join(b, Seq("key")).show
+---+-----+-----+
|key|value|value|
+---+-----+-----+
| b| c| c|
+---+-----+-----+
scala> a.join(b, Seq("key"), "left_outer").show
+---+-----+-----+
|key|value|value|
+---+-----+-----+
| a| b| null|
| b| c| c|
+---+-----+-----+
scala> a.join(b, Seq("key"), "right_outer").show
+---+-----+-----+
|key|value|value|
+---+-----+-----+
| b| c| c|
| c| null| d|
+---+-----+-----+
scala> a.join(b, Seq("key"), "outer").show
+---+-----+-----+
|key|value|value|
+---+-----+-----+
| c| null| d|
| b| c| c|
| a| b| null|
+---+-----+-----+
Run Code Online (Sandbox Code Playgroud)
如果您正在查找以下内容b.csv中不存在的记录a.csv:
scala> val diff = a.join(b, Seq("key"), "right_outer").filter($"a.value" isNull).drop($"a.value")
scala> diff.show
+---+-----+
|key|value|
+---+-----+
| c| d|
+---+-----+
scala> diff.write.csv("diff.csv")
Run Code Online (Sandbox Code Playgroud)