在Spark SQL中加入2个以上的表

SPr*_*ram 3 apache-spark-sql

我试图使用三个表的连接在SPARK SQL中编写查询.但查询输出为null.它适用于单桌.我的Join查询是正确的,因为我已经在oracle数据库中执行了它.我需要在这里进行哪些更正?Spark版本是2.0.0

from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

lines = sc.textFile("/Users/Hadoop_IPFile/purchase")
lines2 = sc.textFile("/Users/Hadoop_IPFile/customer")
lines3 = sc.textFile("/Users/Hadoop_IPFile/book")

parts = lines.map(lambda l: l.split("\t"))
purchase = parts.map(lambda p: Row(year=p[0],cid=p[1],isbn=p[2],seller=p[3],price=int(p[4])))
schemapurchase = sqlContext.createDataFrame(purchase)
schemapurchase.registerTempTable("purchase")


parts2 = lines.map(lambda l: l.split("\t"))
customer = parts2.map(lambda p: Row(cid=p[0],name=p[1],age=p[2],city=p[3],sex=p[4]))
schemacustomer = sqlContext.createDataFrame(customer)
schemacustomer.registerTempTable("customer")

parts3 = lines.map(lambda l: l.split("\t"))
book = parts3.map(lambda p: Row(isbn=p[0],name=p[1]))
schemabook = sqlContext.createDataFrame(book)
schemabook.registerTempTable("book")

result_purchase = sqlContext.sql("""SELECT DISTINCT customer.name AS name FROM purchase JOIN book ON purchase.isbn = book.isbn JOIN customer ON customer.cid = purchase.cid WHERE customer.name != 'Harry Smith' AND purchase.isbn IN (SELECT purchase.isbn FROM customer JOIN purchase ON customer.cid = purchase.cid WHERE customer.name = 'Harry Smith')""")

result = result_purchase.rdd.map(lambda p: "name: " + p.name).collect()
for name in result:
    print(name)


DataSet
---------
Purchase
1999    C1  B1  Amazon  90
2001    C1  B2  Amazon  20
2008    C2  B2  Barnes Noble    30
2008    C3  B3  Amazon  28
2009    C2  B1  Borders 90
2010    C4  B3  Barnes Noble    26


Customer
C1  Jackie Chan 50  Dayton  M
C2  Harry Smith 30  Beavercreek M
C3  Ellen Smith 28  Beavercreek F
C4  John Chan   20  Dayton  M

Book
B1  Novel
B2  Drama
B3  Poem
Run Code Online (Sandbox Code Playgroud)

我在一些网页上找到了以下说明,但它仍然无效:schemapurchase.join(schemabook,schemapurchase.isbn == schemabook.isbn)schemapurchase.join(schemacustomer,schemapurchase.cid == schemacustomer.cid)

phe*_*poo 7

鉴于此示例中的输入DataFrames(对不起,如果某些列名称错误,我猜对了):

采购:

+----+---+----+------------+-----+
|year|cid|isbn|        shop|price|
+----+---+----+------------+-----+
|1999| C1|  B1|      Amazon|   90|
|2001| C1|  B2|      Amazon|   20|
|2008| C2|  B2|Barnes Noble|   30|
|2008| C3|  B3|      Amazon|   28|
|2009| C2|  B1|     Borders|   90|
|2010| C4|  B3|Barnes Noble|   26|
+----+---+----+------------+-----+
Run Code Online (Sandbox Code Playgroud)

顾客:

+---+-----------+---+-----------+-----+
|cid|       name|age|       city|genre|
+---+-----------+---+-----------+-----+
| C1|Jackie Chan| 50|     Dayton|    M|
| C2|Harry Smith| 30|Beavercreek|    M|
| C3|Ellen Smith| 28|Beavercreek|    F|
| C4|  John Chan| 20|     Dayton|    M|
+---+-----------+---+-----------+-----+
Run Code Online (Sandbox Code Playgroud)

书:

+----+-----+
|isbn|genre|
+----+-----+
|  B1|Novel|
|  B2|Drama|
|  B3| Poem|
+----+-----+
Run Code Online (Sandbox Code Playgroud)

您可以使用DataFrame函数翻译该sql查询,如下所示:

val result = purchase.join(book, purchase("isbn")===book("isbn"))
                     .join(customer, customer("cid")===purchase("cid"))
                     .where(customer("name") !== "Harry Smith")
                     .join(temp, purchase("isbn")===temp("purchase_isbn"))
                     .select(customer("name").as("NAME")).distinct()
Run Code Online (Sandbox Code Playgroud)

其中"temp""SELECT IN"的结果,可以认为是另一个连接的结果:

val temp = customer.join(purchase, customer("cid")===purchase("cid") )
                   .where(customer("name")==="Harry Smith")
                   .select(purchase("isbn").as("purchase_isbn"))    


+-------------+
|purchase_isbn|
+-------------+
|           B2|
|           B1|
+-------------+
Run Code Online (Sandbox Code Playgroud)

所以最终的结果是:

+-----------+
|       NAME|
+-----------+
|Jackie Chan|
+-----------+
Run Code Online (Sandbox Code Playgroud)

将此答案视为您可以开始思考的一个问题(例如,过多的连接会对性能产生不良影响).