所以,据我所知,一般情况下应该使用coalesce():
由于某个
filter或其他操作可能导致减少原始数据集(RDD,DF),分区数量减少.coalesce()过滤大型数据集后,可以更有效地运行操作.
我也明白它比repartition通过仅在必要时移动数据来减少混乱更便宜.我的问题是如何定义coalesce带(idealPartionionNo)的参数.我正在研究一个项目,该项目是从另一位工程师传递给我的,他使用下面的计算来计算该参数的值.
// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)
val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR
Run Code Online (Sandbox Code Playgroud)
然后将其与partitioner对象一起使用:
val partitioner = new HashPartitioner(idealPartionionNo)
Run Code Online (Sandbox Code Playgroud)
但也用于:
RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)
Run Code Online (Sandbox Code Playgroud)
这是正确的方法吗?idealPartionionNo价值计算背后的主要思想是什么?什么是REPARTITION_FACTOR?我一般如何定义它?
此外,由于纱线负责确定对飞可用执行人有获得该号(的方式AVAILABLE_EXECUTOR_INSTANCES在运行),并利用它来进行计算idealPartionionNo(如更换NO_OF_EXECUTOR_INSTANCES用AVAILABLE_EXECUTOR_INSTANCES)?
理想情况下,表单的一些实际示例:
n 执行程序,其m 核心和分区因子 …我想提前告诉您,以下几个相关问题不能解决我的问题:
这个很接近,但堆栈跟踪不同,无论如何它都没有解决。所以请放心,我在几天(失败的)解决方案搜索后发布了这个问题。
我正在尝试编写一个作业,将数据(每天一次)从MySQL表移动到Hive存储为Parquet/ORC文件的表Amazon S3。一些表非常大:~ 300M 记录,大小为200 GB 以上(如 所报告phpMyAdmin)。
目前我们正在sqoop为此使用,但Spark由于以下原因,我们想转向:
DataFrame API(将来,我们将在移动数据的同时执行转换)ScalaSpark我已经能够在小 MySQL桌子上实现这一点而没有任何问题。但是,如果我尝试一次获取超过1.5-2M 条记录,那么Spark作业(从MySQLinto读取数据DataFrame)就会失败。我在下面展示了堆栈跟踪的相关部分,您可以在此处找到完整的堆栈跟踪。
...
javax.servlet.ServletException: java.util.NoSuchElementException: None.get
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
at …Run Code Online (Sandbox Code Playgroud)