Spark Dataframe:选择不同的行

Him*_*dav 3 java sql dataframe apache-spark apache-spark-sql

我尝试了两种方法来从镶木地板中找到不同的行,但它似乎不起作用。
尝试 1: Dataset<Row> df = sqlContext.read().parquet("location.parquet").distinct();
但是抛出

Cannot have map type columns in DataFrame which calls set operations
(intersect, except, etc.), 
but the type of column canvasHashes is map<string,string>;;
Run Code Online (Sandbox Code Playgroud)

尝试2: 尝试运行sql查询:

Dataset<Row> df = sqlContext.read().parquet("location.parquet");
    rawLandingDS.createOrReplaceTempView("df");
    Dataset<Row> landingDF = sqlContext.sql("SELECT distinct on timestamp * from df");
Run Code Online (Sandbox Code Playgroud)

我得到的错误:

= SQL ==
SELECT distinct on timestamp * from df
-----------------------------^^^
Run Code Online (Sandbox Code Playgroud)

有没有办法在读取镶木地板文件时获取不同的记录?我可以使用的任何阅读选项。

use*_*362 5

您面临的问题在异常消息中明确说明 - 因为MapType列既不可散列也不可排序,不能用作分组或分区表达式的一部分。

您对 SQL 解决方案的看法在逻辑上并不等同于distinctDataset. 如果您想基于一组兼容的列来删除重复数据,您应该使用dropDuplicates

df.dropDuplicates("timestamp")
Run Code Online (Sandbox Code Playgroud)

这相当于

SELECT timestamp, first(c1) AS c1, first(c2) AS c2,  ..., first(cn) AS cn,
       first(canvasHashes) AS canvasHashes
FROM df GROUP BY timestamp
Run Code Online (Sandbox Code Playgroud)

不幸的是,如果你的目标是实际的DISTINCT,那就不会那么容易了。可能的解决方案是利用 Scala*Map哈希。你可以这样定义Scala : udf

spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)
Run Code Online (Sandbox Code Playgroud)

然后在 Java 代码中使用它来派生可用于dropDuplicates

 df
  .selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
  .dropDuplicates(
    // All columns excluding canvasHashes / hash_of_canvas_hashes
    "timestamp",  "c1", "c2", ..., "cn" 
    // Hash used as surrogate of canvasHashes
    "hash_of_canvas_hashes"         
  )
Run Code Online (Sandbox Code Playgroud)

与 SQL 等效的

SELECT 
  timestamp, c1, c2, ..., cn,   -- All columns excluding canvasHashes
  first(canvasHashes) AS canvasHashes
FROM df GROUP BY
  timestamp, c1, c2, ..., cn    -- All columns excluding canvasHashes
Run Code Online (Sandbox Code Playgroud)

* 请注意,java.util.Map与它hashCode不起作用,因为hashCode不一致。