Jay*_*ahu 6 scala persist dataframe apache-spark apache-spark-sql
我在网上搜索的任何论坛都找不到关于以下主题的讨论.这可能是因为我是Spark和Scala的新手,我不是在问一个有效的问题.如果有任何现有的线程讨论相同或类似的主题,链接将非常有用.:)
我正在开发一个使用Spark和Scala的过程,并通过读取大量表来创建一个文件,并通过将逻辑应用于从表中获取的数据来获取大量字段.所以,我的代码结构是这样的:
val driver_sql = "SELECT ...";
var df_res = spark.sql(driver_sql)
var df_res = df_res.withColumn("Col1", <logic>)
var df_res = df_res.withColumn("Col2", <logic>)
var df_res = df_res.withColumn("Col3", <logic>)
.
.
.
var df_res = df_res.withColumn("Col20", <logic>)
Run Code Online (Sandbox Code Playgroud)
基本上,有一个驱动程序查询,它创建"驱动程序"数据帧.之后,基于驱动程序数据帧中的一个或多个键执行单独的逻辑(函数)以添加新的列/字段."逻辑"部分并不总是单行代码,有时,它是一个单独的函数,它运行另一个查询并在df_res上进行某种连接并添加一个新列.记录计数也发生了变化,因为在某些情况下我使用"内部"连接与其他表/数据帧.
所以,这是我的问题:
df_res任何时间点吗?df_res添加列后,我可以一次又一次地坚持下去吗?我的意思是,它增加了价值吗?df_res每次添加新列时我仍然存在(仅磁盘),是否更换了磁盘中的数据?或者它是否df_res在磁盘中创建了新的副本/版本?小智 1
首先,当您要在数据帧上应用迭代操作时,持久化数据帧会有所帮助。
您在这里所做的是对数据帧应用转换操作。此处无需保留这些数据帧。
例如:- 如果您正在做这样的事情,坚持下去会很有帮助。
val df = spark.sql("select * from ...").persist
df.count
val df1 = df.select("..").withColumn("xyz",udf(..))
df1.count
val df2 = df.select("..").withColumn("abc",udf2(..))
df2.count
Run Code Online (Sandbox Code Playgroud)
现在,如果您在这里坚持 df 那么它将有利于计算 df1 和 df2。这里还要注意的一件事是,我执行 df.count 的原因是因为数据帧仅在对其应用操作时才会被持久化。来自 Spark 文档:“第一次在操作中计算它时,它将保存在节点的内存中”。这也回答了你的第二个问题。
每次坚持时都会创建一个新副本,但您应该先取消坚持前一个副本。
| 归档时间: |
|
| 查看次数: |
4490 次 |
| 最近记录: |