如何在Apache Spark中为两个具有不同结构的DataFrame实现NOT IN

Iva*_*nov 6 java sql apache-spark apache-spark-sql

我在我的Java应用程序中使用Apache Spark.我有两个DataFrame:df1df2.该df1包含Rows的email,firstNamelastName.df2包含Rows email.

我想创建一个DataFrame:df3包含所有行df1,不包含哪个电子邮件df2.

有没有办法用Apache Spark做到这一点?我试图创建JavaRDD<String>df1,并df2通过它们铸造toJavaRDD()和过滤df1包含所有电子邮件和在使用后subtract,但我不知道如何将新的映射JavaRDDds1并获得DataFrame.

基本上我需要所有df1不在其电子邮件中的行df2.

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer ");

DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
                            "WHERE product_id = '" + productId + "'");

JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0));

List<String> notBoughtEmails = customers.javaRDD()
                        .map(row -> row.getString(0))
                        .subtract(customersBoughtEmail).collect();
Run Code Online (Sandbox Code Playgroud)

zer*_*323 6

Spark 2.0.0+

你可以NOT IN直接使用.

Spark <2.0.0

它可以使用外连接和过滤器表示.

val customers = sc.parallelize(Seq(
  ("john@example.com", "John", "Doe"),
  ("jane@example.com", "Jane", "Doe")
)).toDF("email", "first_name", "last_name")

val customersWhoOrderedTheProduct = sc.parallelize(Seq(
  Tuple1("jane@example.com")
)).toDF("email")

val customersWhoHaventOrderedTheProduct = customers.join(
    customersWhoOrderedTheProduct.select($"email".alias("email_")),
    $"email" === $"email_", "leftouter")
 .where($"email_".isNull).drop("email_")

customersWhoHaventOrderedTheProduct.show

// +----------------+----------+---------+
// |           email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com|      John|      Doe|
// +----------------+----------+---------+
Run Code Online (Sandbox Code Playgroud)

原始SQL等价物:

customers.registerTempTable("customers")
customersWhoOrderedTheProduct.registerTempTable(
  "customersWhoOrderedTheProduct")

val query = """SELECT c.* FROM customers c LEFT OUTER JOIN  
                 customersWhoOrderedTheProduct o
               ON c.email = o.email
               WHERE o.email IS NULL"""

sqlContext.sql(query).show

// +----------------+----------+---------+
// |           email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com|      John|      Doe|
// +----------------+----------+---------+
Run Code Online (Sandbox Code Playgroud)

  • 谢谢.第一个例子对我有用.这是Java版本`DataFrame customersWhoHaventOrderedTheProduct = customers .join(customersWhoOrderedTheProduct.select(customersWhoOrderedTheProduct.col("email")),customers.col("email").equalTo(customersWhoOrderedTheProduct.col("email")),"leftouter ").wh​​ere(customersWhoOrderedTheProduct.col("email").isNull()).drop(customersWhoOrderedTheProduct.col("email"));`我尝试了SQL等价但是发生了`scala.MatchError:UUIDType(类org) .apache.spark.sql.cassandra.types.UUIDType $)` (2认同)