将Spark DataSet行值映射到新的哈希列

Jes*_*eta 7 scala apache-spark spark-dataframe apache-spark-dataset

给出以下DataSetinputData:

column0 column1 column2 column3
A       88      text    99
Z       12      test    200
T       120     foo     12
Run Code Online (Sandbox Code Playgroud)

在星火,什么是计算一个新的有效途径hash列,并将它添加到一个新的DataSet,hashedData其中hash被定义为应用MurmurHash3过的每一行的值inputData.

具体来说,hashedData如下:

column0 column1 column2 column3 hash
A       88      text    99      MurmurHash3.arrayHash(Array("A", 88, "text", 99))
Z       12      test    200     MurmurHash3.arrayHash(Array("Z", 12, "test", 200))
T       120     foo     12      MurmurHash3.arrayHash(Array("T", 120, "foo", 12))
Run Code Online (Sandbox Code Playgroud)

如果需要更多细节,请告诉我.

任何帮助表示赞赏.谢谢!

soo*_*ote 9

一种方法是使用该withColumn功能:

import org.apache.spark.sql.functions.hash
dataset.withColumn("hash", hash(dataset.columns.map(col):_*))
Run Code Online (Sandbox Code Playgroud)

  • @JesúsZazueta 只是说我不认为他的解决方案只做了列名。此外,还有一个简洁的函数可以获取多个列并生成一个包含其内容的新列: `df.withColumn("concat", concat(df.columns.map(col):_*))` 他们还有一些其他方法太,例如[指定连接分隔符](https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/functions.html#concat_ws-java.lang。字符串-org.apache.spark.sql.Column...-)。 (2认同)

Jes*_*eta 5

事实证明,Spark 已经将其实现为hash包内的函数org.apache.spark.sql.functions

/**
 * Calculates the hash code of given columns, and returns the result as an int column.
 *
 * @group misc_funcs
 * @since 2.0
 */
@scala.annotation.varargs
def hash(cols: Column*): Column = withExpr {
  new Murmur3Hash(cols.map(_.expr))
}
Run Code Online (Sandbox Code Playgroud)

在我的情况下,申请为:

import org.apache.spark.sql.functions.{col, hash}

val newDs = typedRows.withColumn("hash", hash(typedRows.columns.map(col): _*))
Run Code Online (Sandbox Code Playgroud)

我真的有很多关于 Spark sql 的知识 :(。

留在这里以防其他人需要它。谢谢!