什么时候在 Spark 数据帧上使用 persist() 性能不实用?

Sar*_*ata 9 python-2.7 apache-spark pyspark apache-spark-2.3

在致力于提高代码性能时,因为我有许多作业失败(中止),persist()所以每当我需要在许多其他操作中使用相同的数据帧时,我都会考虑使用 Spark Dataframe 上的函数。在执行此操作并跟踪 Spark 应用程序 UI 中的作业、阶段时,我觉得这样做并不总是最佳,这取决于分区数量和数据大小。我不确定直到我因为坚持阶段失败而中止工作。

我想知道在数据帧上执行许多操作时使用的最佳实践是否始终有效?persist() 如果不是,什么时候不是?怎么判断?

更具体地说,我将展示我的代码和中止作业的详细信息:

#create a dataframe from another one df_transf_1 on which I made a lot of transformations but no actions
spark_df = df_transf_1.select('user_id', 'product_id').dropDuplicates()
#persist
spark_df.persist()
products_df = spark_df[['product_id']].distinct()
df_products_indexed = products_df.rdd.map(lambda r: r.product_id).zipWithIndex().toDF(['product_id', 'product_index'])
Run Code Online (Sandbox Code Playgroud)

你可能会问我为什么坚持spark_dfproducts_df这是因为我将像 with和 in 一样多次使用它joins(例如:spark_df = spark_df.join(df_products_indexed,"product_id")

工作阶段

第三阶段失败原因详情:

由于阶段失败而中止作业:阶段 3.0 中的任务 40458 失败了 4 次,最近一次失败:阶段 3.0 中丢失任务 40458.3(TID 60778,xx.xx.yyyy.com,执行器 91):ExecutorLostFailure(执行器 91 因一而退出)正在运行的任务)原因:从站丢失驱动程序堆栈跟踪:

输入数据的大小(4 TB)很大,在进行持久化之前有没有办法检查数据的大小?这是选择坚持与否的一个参数吗?还有> 100,000的分区(任务)数量persist

abi*_*sis 5

以下是两种使用情况persist()

  • 使用后repartition,以避免在后续步骤使用数据帧时一次又一次地打乱数据。这仅在您为持久数据帧/RDD调用多个操作的情况下才有用,因为持久是一种转换,因此是延迟评估的。一般来说,如果您对同一个数据帧/RDD 有多个操作。

  • 迭代计算,例如当您想要在 for 循环内查询数据帧时。Sparkpersist将保存中间结果并省略在每个操作调用上重新评估相同的操作。另一个示例是使用 a 附加新列,如此join所述。


Man*_*que 2

我的经验告诉我,当你对数据帧执行多个操作时,你应该保留数据帧,这样你就可以创建临时表(同时确保如果出现故障,你有一个恢复点)。通过这样做,您可以防止巨大的 DAG 通常不会结束,例如,如果您有连接。所以我的建议是做这样的事情:

# operations
df.write.saveAsTable('database.tablename_temp')
df = spark.table('database.tablename_temp')
# more operations
Run Code Online (Sandbox Code Playgroud)