Kir*_*ran 3 scala tableau-api hdinsight apache-spark apache-spark-sql
我想使用tableau显示来自HDInsight SPARK的数据.我正在关注这个视频,他们已经描述了如何连接两个系统并公开数据.
目前我的脚本本身非常简单,如下所示:
/* csvFile is an RDD of lists, each list representing a line in the CSV file */
val csvLines = sc.textFile("wasb://mycontainer@mysparkstorage.blob.core.windows.net/*/*/*/mydata__000000.csv")
// Define a schema
case class MyData(Timestamp: String, TimezoneOffset: String, SystemGuid: String, TagName: String, NumericValue: Double, StringValue: String)
// Map the values in the .csv file to the schema
val myData = csvLines.map(s => s.split(",")).filter(s => s(0) != "Timestamp").map(
s => MyData(s(0),
s(1),
s(2),
s(3),
s(4).toDouble,
s(5)
)
).toDF()
// Register as a temporary table called "processdata"
myData.registerTempTable("test_table")
myData.saveAsTable("test_table")
Run Code Online (Sandbox Code Playgroud)
不幸的是我遇到了以下错误
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
org.apache.spark.sql.AnalysisException: Table `test_table` already exists.;
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:209)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:198)
Run Code Online (Sandbox Code Playgroud)
我也尝试使用以下代码覆盖表(如果存在)
import org.apache.spark.sql.SaveMode
myData.saveAsTable("test_table", SaveMode.Overwrite)
Run Code Online (Sandbox Code Playgroud)
但它仍然给我同样的错误.
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
java.lang.RuntimeException: Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.SparkStrategies$DDLStrategy$.apply(SparkStrategies.scala:416)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
Run Code Online (Sandbox Code Playgroud)
有人可以帮我解决这个问题吗?
我知道这是我的错误,但我会留下它作为答案,因为它在任何博客或论坛答案中都不容易获得.希望它会帮助像我这样的人从Spark开始
我发现.toDF()实际上创造了sqlContext而不是hiveContext基础DataFrame.所以我现在更新了我的代码,如下所示
// Map the values in the .csv file to the schema
val myData = csvLines.map(s => s.split(",")).filter(s => s(0) != "Timestamp").map(
s => MyData(s(0),
s(1),
s(2),
s(3),
s(4).toDouble,
s(5)
)
)
// Register as a temporary table called "myData"
val myDataFrame = hiveContext.createDataFrame(myData)
myDataFrame.registerTempTable("mydata_stored")
myDataFrame.write.mode(SaveMode.Overwrite).saveAsTable("mydata_stored")
Run Code Online (Sandbox Code Playgroud)
还要确保s(4)具有正确的double值,否则添加try/catch来处理它.我做了这样的事情:
def parseDouble(s: String): Double = try { s.toDouble } catch { case _ => 0.00 }
parseDouble(s(4))
Run Code Online (Sandbox Code Playgroud)
关心Kiran
| 归档时间: |
|
| 查看次数: |
2165 次 |
| 最近记录: |