我读到了有关检查点的内容,它看起来很适合我的需求,但我找不到如何使用它的好例子。
我的问题是:
我应该指定检查点目录吗?是否可以这样做:
df.检查点()
有什么我应该注意的可选参数吗?
是否有默认检查点目录,或者我必须指定一个作为默认检查点目录?
当我检查数据帧并重用它时 - 它会自动从我们编写文件的目录中读取数据吗?
如果您能与我分享在 pyspark 中使用检查点的示例并附上一些解释,那就太好了。谢谢!
我了解广播优化的概念。
当连接中的一侧数据较小时,最好只针对较小的一侧进行洗牌。
但为什么不能只使用执行器来进行这种洗牌呢?为什么我们需要使用驱动程序?
如果每个执行器都保存哈希表来映射执行器之间的记录,我认为它应该可以工作。
在 Spark 广播的当前实现中 - 它将数据收集到驱动程序,然后对其进行洗牌,并且对驱动程序的收集操作是我想避免的瓶颈。
关于如何在没有驱动程序内存瓶颈的情况下实现类似的优化有什么想法吗?
我有代码表明他的目标是从 1.5B 记录中取出 10M 最旧的记录。
我尝试用它来做orderBy,但它从未完成,然后我尝试用窗口函数来做,15 分钟后完成。
我理解orderBy每个执行器都会获取部分数据,对其进行排序并将前 10M 传递给最终执行器。因为 10M> 分区大小,我们将所有数据传递给最终执行器,然后需要很长时间才能完成。
我不明白窗口解决方案是如何工作的?在单个执行器开始运行之前,shuffle 中发生了什么?这种洗牌如何帮助单个执行器中的排序更快地工作?我将不胜感激任何有助于理解窗口函数在这种情况下如何在后台工作的帮助。
这是窗口函数的代码:
df = sess.sql("select * from table")
last_update_window = Window.orderBy(f.col("last_update").asc())
df = df.withColumn('row_number', f.row_number().over(last_update_window))
df = df.where(f.col('row_number') <= 1000000000)
Run Code Online (Sandbox Code Playgroud)
这是代码orderBy:
df = sess.sql("select * from table")
df = df.orderBy(f.col('last_update').asc()).limit(100000000)
Run Code Online (Sandbox Code Playgroud)
下面是执行窗口函数时的计划图: