相关疑难解决方法(0)

Spark - repartition()vs coalesce()

根据Learning Spark的说法

请记住,重新分区数据是一项相当昂贵的操作.Spark还有一个优化版本的repartition(),称为coalesce(),它允许避免数据移动,但前提是你减少了RDD分区的数量.

我得到的一个区别是,使用repartition()可以增加/减少分区数量,但是使用coalesce()时,只能减少分区数量.

如果分区分布在多台机器上并运行coalesce(),它如何避免数据移动?

distributed-computing apache-spark rdd

208
推荐指数
13
解决办法
15万
查看次数

为什么一项工作因"设备上没有剩余空间"而失败,但df说不然?

当执行shuffle时,我的Spark工作失败并说"设备上没有剩余空间",但是当我运行df -h它时说我有剩余空间!为什么会发生这种情况,我该如何解决?

apache-spark

43
推荐指数
5
解决办法
3万
查看次数

什么时候在 Spark 数据帧上使用 persist() 性能不实用?

在致力于提高代码性能时,因为我有许多作业失败(中止),persist()所以每当我需要在许多其他操作中使用相同的数据帧时,我都会考虑使用 Spark Dataframe 上的函数。在执行此操作并跟踪 Spark 应用程序 UI 中的作业、阶段时,我觉得这样做并不总是最佳,这取决于分区数量和数据大小。我不确定直到我因为坚持阶段失败而中止工作。

我想知道在数据帧上执行许多操作时使用的最佳实践是否始终有效?persist() 如果不是,什么时候不是?怎么判断?

更具体地说,我将展示我的代码和中止作业的详细信息:

#create a dataframe from another one df_transf_1 on which I made a lot of transformations but no actions
spark_df = df_transf_1.select('user_id', 'product_id').dropDuplicates()
#persist
spark_df.persist()
products_df = spark_df[['product_id']].distinct()
df_products_indexed = products_df.rdd.map(lambda r: r.product_id).zipWithIndex().toDF(['product_id', 'product_index'])
Run Code Online (Sandbox Code Playgroud)

你可能会问我为什么坚持spark_dfproducts_df这是因为我将像 with和 in 一样多次使用它joins(例如:spark_df = spark_df.join(df_products_indexed,"product_id")

工作阶段

第三阶段失败原因详情:

由于阶段失败而中止作业:阶段 3.0 中的任务 40458 失败了 4 次,最近一次失败:阶段 3.0 中丢失任务 40458.3(TID 60778,xx.xx.yyyy.com,执行器 91):ExecutorLostFailure(执行器 91 因一而退出)正在运行的任务)原因:从站丢失驱动程序堆栈跟踪:

输入数据的大小(4 TB …

python-2.7 apache-spark pyspark apache-spark-2.3

9
推荐指数
2
解决办法
6815
查看次数

广播哈希联接-迭代

当我们有一个足够小以适合内存的数据帧时,我们在Spark中使用广播哈希联接。当小数据框的大小小于以下时,spark.sql.autoBroadcastJoinThreshold 我对此几乎没有疑问。

我们提示广播的小数据帧的生命周期是多少?它会在内存中保留多长时间?我们如何控制它?

例如,如果我使用广播哈希连接将大型数据框与小型数据框连接了两次。第一次执行联接时,它将把小数据帧广播到工作节点并执行联接,同时避免大数据帧数据的混排。

我的问题是,执行者将保留广播数据帧的副本多长时间?它会保留在内存中直到会话结束吗?否则,一旦我们采取任何措施,它将被清除。我们可以控制还是清除它?或者我只是在错误的方向上思考...

apache-spark apache-spark-sql pyspark

8
推荐指数
1
解决办法
755
查看次数

用于有效连接Spark数据帧/数据集的分区数据

我需要join基于一些共享键列来组合许多DataFrame.对于键值RDD,可以指定分区器,以便将具有相同键的数据点混洗到同一个执行器,因此加入更有效(如果在之前有一个shuffle相关操作join).可以在Spark DataFrames或DataSet上完成同样的事情吗?

partitioning apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

6
推荐指数
2
解决办法
1万
查看次数

高效的pyspark加入

我已经阅读了很多有关如何在pyspark中进行有效联接的内容。我发现实现高效联接的方法基本上是:

  • 如果可以,请使用广播加入。(我通常不能,因为数据帧太大)
  • 考虑使用非常大的群集。(我宁愿不是因为$$$)。
  • 使用相同的分区程序

最后一个是我想尝试的,但是我找不到在pyspark中实现它的方法。我试过了:

df.repartition(numberOfPartitions,['parition_col1','partition_col2'])
Run Code Online (Sandbox Code Playgroud)

但这无济于事,直到我停止它仍需要花费很长时间,因为在最后的几项工作中卡住了火花。

因此,如何在pyspark中使用相同的分区程序并加快连接速度,甚至摆脱永远需要的时间?我需要使用哪个代码?

PD:即使在stackoverflow上,我也查看了其他文章,但是我仍然看不到代码。

apache-spark pyspark

3
推荐指数
1
解决办法
1419
查看次数