如何在pyspark.sql中选择创建表

Kab*_*ard 3 python apache-spark pyspark pyspark-sql

是否可以使用select语句在spark上创建表?

我做以下

import findspark
findspark.init()
import pyspark
from pyspark.sql import SQLContext

sc = pyspark.SparkContext()
sqlCtx = SQLContext(sc)

spark_df = sqlCtx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data/documents_topics.csv")
spark_df.registerTempTable("my_table")

sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table")
Run Code Online (Sandbox Code Playgroud)

但我得到了错误

/ Users / user / anaconda / bin / python /Users/user/workspace/Outbrain-Click-Prediction/test.py使用Spark的默认log4j配置文件:org / apache / spark / log4j-defaults.properties将默认日志级别设置为“ WARN” ”。要调整日志记录级别,请使用sc.setLogLevel(newLevel)。17/01/21 17:19:43 WARN NativeCodeLoader:无法在适用的平台上使用内置的Java类为您的平台加载本机Hadoop库。Traceback(最近一次调用为最新):File“ / Users / user / spark- 2.0.2-bin-hadoop2.7 / python / pyspark / sql / utils.py“,第63行,在装饰返回f(* a,** kw)文件“ /Users/user/spark-2.0.2-bin”中-hadoop2.7 / python / lib / py4j-0.10.3-src.zip / py4j / protocol.py“,第319行,位于get_return_value py4j.protocol.Py4JJavaError中:调用o19.sql时发生错误。:org.apache.spark.sql.AnalysisException:my_table_2 创建时间:2017年1月21日星期六星期六:EST上次访问:1969年12月31日星期三星期三类型:MANAGED Storage(InputFormat:org.apache.hadoop.mapred.TextInputFormat,OutputFormat:org.apache.hadoop。 hive.ql.io.HiveIgnoreKeyTextOutputFormat)),false ;; 'CreateHiveTableAsSelectLogicalPlan CatalogTable(表:my_table_2 创建时间:2017年1月21日星期六最后访问:星期三12月31日星期三18:59:59 EST 1969类型:MANAGED Storage(InputFormat:org.apache.hadoop.mapred.TextInputFormat,OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)),否:+-项目[document_id#0,topic_id#1,confidence_level#2]:+-SubqueryAlias my_table:+-Relation [document_id#0,topic_id# 1,confidence_level#2] csv

在org.org.apache.spark.sql.catalyst.analysis.CheckAnalysis $ class.failAnalysis(CheckAnalysis.scala:40)在org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)在org .apache.spark.sql.catalyst.analysis.CheckAnalysis $$ anonfun $ checkAnalysis $ 1.apply(CheckAnalysis.scala:374)在org.apache.spark.sql.catalyst.analysis.CheckAnalysis $$ anonfun $ checkAnalysis $ 1.apply(位于org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)的CheckAnalysis.scala:67),位于org.apache.spark.sql.catalyst.analysis.CheckAnalysis $ class.checkAnalysis(CheckAnalysis。 scala:67)at org.org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) .apache.spark.sql.Dataset $ .ofRows(Dataset.scala:64)在org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ref4.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)上的py4j.reflection.MethodInvoker(Method.java:498)。 py4j.Gateway上的ReflectionEngine.invoke(ReflectionEngine.java:357)。py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)上的py4j.commands.CallCommand.execute(CallCommand.java)上的ReflectionEngine.invoke(Gateway.java:280) :79),位于py4j.GatewayConnection.run(GatewayConnection.java:214),位于java.lang.Thread.run(Thread.java:745)sql(SparkSession.scala:582)位于sun.reflect.NativeMethodAccessorImpl.invoke0(本地方法)位于sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)位于sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)处py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)处java.lang.reflect.Method.invoke(Method.java:498)处。 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)上的Gateway.invoke(Gateway.java:280),py4j.GatewayConnection.run(GatewayConnection.java)上py4j.commands.CallCommand.execute(CallCommand.java:79) :214),位于java.lang.Thread.run(Thread.java:745)sql(SparkSession.scala:582)位于sun.reflect.NativeMethodAccessorImpl.invoke0(本地方法)位于sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)位于sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)处py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)处java.lang.reflect.Method.invoke(Method.java:498)处。 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)上的Gateway.invoke(Gateway.java:280),py4j.GatewayConnection.run(GatewayConnection.java)上py4j.commands.CallCommand.execute(CallCommand.java:79) :214),位于java.lang.Thread.run(Thread.java:745)在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)处invoke0(本机方法)在java.lang.reflect.Method.invoke(Method.java:处)sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)处498)在py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)在py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)在py4j.Gateway.invoke(Gateway.java:280)在py4j.commands。 py4j.commands.CallCommand.execute(CallCommand.java:79)处的py4j.AbstractCommand.invokeMethod(AbstractCommand.java:132)py4j.GatewayConnection.run(GatewayConnection.java:214)处的java.lang.Thread.run(Thread.java) :745)在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)处invoke0(本机方法)在java.lang.reflect.Method.invoke(Method.java:处)sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)处498)在py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)在py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)在py4j.Gateway.invoke(Gateway.java:280)在py4j.commands。 py4j.commands.CallCommand.execute(CallCommand.java:79)处的py4j.AbstractCommand.invokeMethod(AbstractCommand.java:132)py4j.GatewayConnection.run(GatewayConnection.java:214)处的java.lang.Thread.run(Thread.java) :745)在py4j.reflection.ReflectionEngine.invoke(ReflectionEngine)处在java.lang.reflect.Method.invoke(Method.java:498)在py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)处调用(DelegatingMethodAccessorImpl.java:43) .java:357)在py4j.Gateway.invoke(Gateway.java:280)在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在py4j.commands.CallCommand.execute(CallCommand.java:79)在py4j .GatewayConnection.run(GatewayConnection.java:214)在java.lang.Thread.run(Thread.java:745)在py4j.reflection.ReflectionEngine.invoke(ReflectionEngine)处在java.lang.reflect.Method.invoke(Method.java:498)在py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)处调用(DelegatingMethodAccessorImpl.java:43) .java:357)在py4j.Gateway.invoke(Gateway.java:280)在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在py4j.commands.CallCommand.execute(CallCommand.java:79)在py4j .GatewayConnection.run(GatewayConnection.java:214)在java.lang.Thread.run(Thread.java:745)在py4j.commands.CallCommand.execute(CallCommand.java:79)在py4j.GatewayConnection.run(GatewayConnection.java:214)在java.lang.Thread.run(Thread.java:745)上invokeMethod(AbstractCommand.java:132) )在py4j.commands.CallCommand.execute(CallCommand.java:79)在py4j.GatewayConnection.run(GatewayConnection.java:214)在java.lang.Thread.run(Thread.java:745)上invokeMethod(AbstractCommand.java:132) )

在处理上述异常期间,发生了另一个异常:

追溯(最近一次通话最后一次):文件“ /Users/user/user/workspace/Outbrain-Click-Prediction/test.py”,在sqlCtx.sql中的第16行(“从my_table创建表my_table_2 AS SELECT * *”)文件“ /用户/用户/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/context.py”,行360,在sql中返回self.sparkSession.sql(sqlQuery)文件“ / Users / user / spark- 2.0.2-bin-hadoop2.7 / python / pyspark / sql / session.py“,第543行,在sql中返回DataFrame(self._jsparkSession.sql(sqlQuery),self._wrapped)文件” / Users / user / spark -2.0.2-bin-hadoop2.7 / python / lib / py4j-0.10.3-src.zip / py4j / java_gateway.py“,第1133行,正在调用 在装饰性引发AnalysisException(s.split(':',1)[1]中的文件“ /Users/user/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/utils.py”的第69行],stackTrace)pyspark.sql.utils.AnalysisException:“未解决的运算符'CreateHiveTableAsSelectLogicalPlan CatalogTable(\ n \ tTable: my_table_2\ n \ tCreated:Sat Jan 21 17:19:53 EST 2017 \ n \ t最后访问时间:12月31日星期三: 59:59 EST 1969 \ n \ t类型:MANAGED \ n \ tStorage(InputFormat:org.apache.hadoop.mapred.TextInputFormat,OutputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)),false ;; \ n'CreateHiveTableAsSelectLogicalPlan CatalogTable(\ n \ tTable: my_table_2\ n \ t创建时间:美国东部时间2017年1月21日星期六17:19:53 \ n \ t最后访问时间:美国东部标准时间1969年12月31日星期三18:59:59 \ n \ t类型:MANAGED \ n \ tStorage(InputFormat:org.apache.hadoop。 mapred.TextInputFormat,OutputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)),false \ n:+-项目[document_id#0,topic_id#1,confidence_level#2] \ n:+-SubqueryAlias my_table \ n:+-关系[document_id#0,topic_id#1,confidence_level#2] csv \ n“

Kab*_*ard 5

我通过使用HiveContext而不是SQLContext如下更正了此问题:

import findspark
findspark.init()
import pyspark
from pyspark.sql import HiveContext

sqlCtx= HiveContext(sc)

spark_df = sqlCtx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data/documents_topics.csv")
spark_df.registerTempTable("my_table")

sqlCtx.sql("CREATE TABLE my_table_2 AS SELECT * from my_table")
Run Code Online (Sandbox Code Playgroud)

  • 现在值得注意的是,`HiveContext()` 已[弃用](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.HiveContext) 作为Spark 2.2。正确答案现在使用 `SparkSession.enableHiveSupport().getOrCreate()` 代替。 (4认同)