Spark:加入数组

acl*_*kay 6 scala apache-spark apache-spark-sql

我需要将一个带有字符串列的数据帧连接到一个带有字符串数组的数据帧,这样如果数组中的一个值匹配,则行将加入.

我试过了,但我想这不是支持.还有其他办法吗?

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._

val left = spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF("col1")
val right = spark.sparkContext.parallelize(Seq((Array(1, 2), "Yes"),(Array(3),"No"))).toDF("col1", "col2")

left.join(right,"col1")
Run Code Online (Sandbox Code Playgroud)

抛出:

org.apache.spark.sql.AnalysisException:由于数据类型不匹配,无法解析'(col1 = col1)':'(col1=

col1)'(int和array).;;

Dan*_*ula 9

一种选择是创建一个UDF来构建连接条件:

import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

val left = spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF("col1")
val right = spark.sparkContext.parallelize(Seq((Array(1, 2), "Yes"),(Array(3),"No"))).toDF("col1", "col2")

val checkValue = udf { 
  (array: WrappedArray[Int], value: Int) => array.contains(value) 
}
val result = left.join(right, checkValue(right("col1"), left("col1")), "inner")

result.show

+----+------+----+
|col1|  col1|col2|
+----+------+----+
|   1|[1, 2]| Yes|
|   2|[1, 2]| Yes|
|   3|   [3]|  No|
+----+------+----+
Run Code Online (Sandbox Code Playgroud)


ran*_*l25 7

最简洁的方法是使用如下所示的 array_contains spark sql 表达式,也就是说我已经将它的性能与执行爆炸和连接的性能进行了比较,如前一个答案所示,爆炸似乎性能更高.

import org.apache.spark.sql.functions.expr
import spark.implicits._

val left = Seq(1, 2, 3).toDF("col1")

val right = Seq((Array(1, 2), "Yes"),(Array(3),"No")).toDF("col1", "col2").withColumnRenamed("col1", "col1_array")

val joined = left.join(right, expr("array_contains(col1_array, col1)")).show

+----+----------+----+
|col1|col1_array|col2|
+----+----------+----+
|   1|    [1, 2]| Yes|
|   2|    [1, 2]| Yes|
|   3|       [3]|  No|
+----+----------+----+
Run Code Online (Sandbox Code Playgroud)

请注意,您不能直接使用 org.apache.spark.sql.functions.array_contains 函数,因为它要求第二个参数是文字而不是列表达式。


Fab*_*ich 5

您可以explode在连接之前在Array列上使用。爆炸为数组中的每个元素创建新行:

right = right.withColumn("exploded_col",explode(right("col1")))
right.show()

+------+----+--------------+
|  col1|col2|exploded_col_1|
+------+----+--------------+
|[1, 2]| Yes|             1|
|[1, 2]| Yes|             2|
|   [3]|  No|             3|
+------+----+--------------+
Run Code Online (Sandbox Code Playgroud)

然后,您可以轻松加入第一个数据集。