如何在Spark SQL中执行冗长的多行Hive查询?像下面的查询:
val sqlContext = new HiveContext (sc)
val result = sqlContext.sql ("
select ...
from ...
");
Run Code Online (Sandbox Code Playgroud) 最近我看到了Spark的一些奇怪行为.
我在我的应用程序中有一个管道,我正在操作一个大数据集 - 伪代码:
val data = spark.read (...)
data.join(df1, "key") //etc, more transformations
data.cache(); // used to not recalculate data after save
data.write.parquet() // some save
val extension = data.join (..) // more transformations - joins, selects, etc.
extension.cache(); // again, cache to not double calculations
extension.count();
// (1)
extension.write.csv() // some other save
extension.groupBy("key").agg(some aggregations) //
extension.write.parquet() // other save, without cache it will trigger recomputation of whole dataset
Run Code Online (Sandbox Code Playgroud)
但是当我调用data.unpersist()
ie时(1)
,Spark会从存储中删除所有数据集,也就是extension
数据集,它不是我试图取消的数据集.
这是预期的行为吗?如何unpersist …
有一个带有一些分类字符串值的DataFrame(例如uuid | url | browser).
我想将它转换为double来执行接受双矩阵的ML算法.
作为转换方法,我使用StringIndexer(spark 1.4)将我的字符串值映射到double值,所以我定义了一个这样的函数:
def str(arg: String, df:DataFrame) : DataFrame =
(
val indexer = new StringIndexer().setInputCol(arg).setOutputCol(arg+"_index")
val newDF = indexer.fit(df).transform(df)
return newDF
)
Run Code Online (Sandbox Code Playgroud)
现在问题是我将迭代df的foreach列,调用此函数并在解析的双列中添加(或转换)原始字符串列,因此结果将是:
初始df:
[String: uuid|String: url| String: browser]
Run Code Online (Sandbox Code Playgroud)
最终df:
[String: uuid|Double: uuid_index|String: url|Double: url_index|String: browser|Double: Browser_index]
Run Code Online (Sandbox Code Playgroud)
提前致谢
有没有办法在Spark SQL语句中使用广播?
例如:
SELECT
Column
FROM
broadcast (Table 1)
JOIN
Table 2
ON
Table1.key = Table2.key
Run Code Online (Sandbox Code Playgroud)
在我的例子中,表1也是一个子查询.
最近我正在使用带有 JDBC 数据源的 Spark。考虑以下片段:
val df = spark.read.(options).format("jdbc").load();
val newDF = df.where(PRED)
Run Code Online (Sandbox Code Playgroud)
PRED 是谓词列表。
如果 PRED 是一个简单的谓词,比如x = 10
,查询会快得多。但是,如果有一些不相等的条件,例如date > someOtherDate or date < someOtherDate2
,查询比没有谓词下推要慢得多。您可能知道,数据库引擎对此类谓词的扫描速度非常慢,在我的情况下甚至慢 10 倍(!)。
为了防止不必要的谓词下推,我使用了:
val cachedDF = df.cache()
val newDF = cachedDF.where(PRED)
Run Code Online (Sandbox Code Playgroud)
但它需要大量内存并且 - 由于这里提到的问题 - Spark' Dataset unpersist 行为- 我无法 unpersist cachedDF
。
还有其他选择可以避免下推谓词吗?没有缓存也没有编写自己的数据源?
注意:即使有关闭谓词下推的选项,它也仅适用于其他查询可能仍在使用它。所以,如果我写:
// some fancy option set to not push down predicates
val df1 = ...
// predicate pushdown works again
val df2 = ...
df1.join(df2)// where …
Run Code Online (Sandbox Code Playgroud) 我在spark中运行了一个字数统计程序,但是我收到了以下错误 scala-xml_2.11-1.0.2.jar
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/12/16 05:14:02 INFO SparkContext: Running Spark version 2.0.2
16/12/16 05:14:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/12/16 05:14:03 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.59.132 instead (on interface ens33)
16/12/16 05:14:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/12/16 05:14:04 INFO SecurityManager: Changing view acls to: hadoopusr
16/12/16 05:14:04 INFO SecurityManager: Changing …
Run Code Online (Sandbox Code Playgroud) 我已按照链接http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html中的指南进行操作
但这已经过时,因为它使用了spark Mlib RDD方法.New Spark 2.0具有DataFrame方法.现在我的问题是我有更新的代码
val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
.map(parseRating)
.toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
// Build the recommendation model using ALS on the training data
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
val model = als.fit(training)
// Evaluate the model by computing the RMSE on the test data
val predictions = model.transform(test)
Run Code Online (Sandbox Code Playgroud)
现在问题是,在旧代码中,获得的模型是MatrixFactorizationModel,现在它有自己的模型(ALSModel)
在MatrixFactorizationModel中你可以直接做
val recommendations = bestModel.get
.predict(userID)
Run Code Online (Sandbox Code Playgroud)
这将给出用户喜欢它们的概率最高的产品列表.
但现在没有.predict方法.任何想法如何推荐给定用户ID的产品列表
据我所知,Lombok使用Java的Annotation Processors来生成其他方法.
与Maven 3.5它完全无需添加任何额外的配置,只需添加dependecy龙目岛,并把一些注解一样@Getter
,@Setter
.
但是,如果我在IntelliJ IDEA 2018.2中打开此项目,则生成的getter/setter的所有用法都会突出显示为错误.我打开了Annotation Processing,我尝试在IntelliJ中构建项目或在Maven中构建然后在IntelliJ中使用,但它仍然需要Lombok插件来避免错误.
这是某种虫子吗?工作流程出错?或者也许龙目岛不仅使用注释处理器,还有其他一些我不知道的东西,这就是为什么IntelliJ + javac无法弄清楚如何处理它?这很奇怪,因为javac本身编译这些文件没有错误
我知道有很多问题"我在使用Lombok时遇到错误"和"使用插件"之类的答案.我不是在问我是否应该使用插件,但为什么我应该使用它,为什么IntelliJ无法在没有插件的情况下处理它而javac呢
java intellij-idea annotation-processing lombok intellij-lombok-plugin
我有Maven依赖spark-sql_2.1.0
和spark-hive_2.1.0
.但是,当我尝试时import org.apache.spark.sql.DataFrame
,有一个错误.但导入
org.apache.spark.sql.SQLContext
没问题,没有错误.为什么?