如何在 PySpark 中使用 Spark 的 registerDataFrameAsTable?

sim*_*ing 2 apache-spark pyspark

使用时遇到问题registerDataFrameAsTable。根据文档,它看起来在类中sqlContext,所以我自然地尝试了这个:

df = spark.registerDataFrameAsTable(mydf, "table1")
Run Code Online (Sandbox Code Playgroud)

但这导致了这个错误:

AttributeError: 'SparkSession' object has no attribute 'registerDataFrameAsTable' 
Run Code Online (Sandbox Code Playgroud)

我也尝试过这个:

from pyspark.sql import SQLContext
df = SQLContext.registerDataFrameAsTable(mydf, "table1")
Run Code Online (Sandbox Code Playgroud)

但这导致了这个奇怪的错误:

类型错误:registerDataFrameAsTable() 缺少 1 个必需的位置参数:“tableName”

这似乎是使用该函数的错误方法,因为看起来我必须显式命名参数,并且它也需要该self参数。

roh*_*roh 5

我建议将您的应用程序迁移到 pyspark 2.x,或者如果您从 2.x 开始学习。我在下面提供了 2.x 和 1.x 的代码。

火花2.X

如果你有一个 Spark DataFrame df

df.show(5)
#+---+---+---+---+---+------+
#|_c0|_c1|_c2|_c3|_c4|   _c5|
#+---+---+---+---+---+------+
#|  1|5.1|3.5|1.4|0.2|setosa|
#|  2|4.9|  3|1.4|0.2|setosa|
#|  3|4.7|3.2|1.3|0.2|setosa|
#|  4|4.6|3.1|1.5|0.2|setosa|
#|  5|  5|3.6|1.4|0.2|setosa|
#+---+---+---+---+---+------+
Run Code Online (Sandbox Code Playgroud)

您可以将createOrReplaceTempView其注册为表格:

df.createOrReplaceTempView("people")
spark.sql("select * from people").show(n=5)
#+---+---+---+---+---+------+
#|_c0|_c1|_c2|_c3|_c4|   _c5|
#+---+---+---+---+---+------+
#|  1|5.1|3.5|1.4|0.2|setosa|
#|  2|4.9|  3|1.4|0.2|setosa|
#|  3|4.7|3.2|1.3|0.2|setosa|
#|  4|4.6|3.1|1.5|0.2|setosa|
#|  5|  5|3.6|1.4|0.2|setosa|
#+---+---+---+---+---+------+
Run Code Online (Sandbox Code Playgroud)

或者,您可以使用createGlobalTempView

df.createGlobalTempView("people_global")
tempdf=spark.sql("select * from people_global")
tempdf.show(n=5)
#+---+---+---+---+---+------+
#|_c0|_c1|_c2|_c3|_c4|   _c5|
#+---+---+---+---+---+------+
#|  1|5.1|3.5|1.4|0.2|setosa|
#|  2|4.9|  3|1.4|0.2|setosa|
#|  3|4.7|3.2|1.3|0.2|setosa|
#|  4|4.6|3.1|1.5|0.2|setosa|
#|  5|  5|3.6|1.4|0.2|setosa|
#+---+---+---+---+---+------+
Run Code Online (Sandbox Code Playgroud)

TempTableAlreadyExistsException但如果该名称已经存在,则会抛出异常。

火花1.X

您可以使用pyspark.sql.SQLContext.registerDataFrameAsTable

from pyspark.sql import Row
df = sc.parallelize(
    [
        Row(name='Alice', age=5, height=80),
        Row(name='Alice', age=5, height=80),
        Row(name='Alice', age=10, height=80)
    ]
).toDF()
df.show()
#+---+------+-----+
#|age|height| name|
#+---+------+-----+
#|  5|    80|Alice|
#|  5|    80|Alice|
#| 10|    80|Alice|
#+---+------+-----+

sqlContext.registerDataFrameAsTable(df, "table1")
dftemp=sqlContext.sql("select * from table1")
dftemp.show()
#+---+------+-----+
#|age|height| name|
#+---+------+-----+
#|  5|    80|Alice|
#|  5|    80|Alice|
#| 10|    80|Alice|
#+---+------+-----+
Run Code Online (Sandbox Code Playgroud)