相关疑难解决方法(0)

如何计算合并的最佳numberOfPartitions?

所以,据我所知,一般情况下应该使用coalesce():

由于某个filter或其他操作可能导致减少原始数据集(RDD,DF),分区数量减少.coalesce()过滤大型数据集后,可以更有效地运行操作.

我也明白它比repartition通过仅在必要时移动数据来减少混乱更便宜.我的问题是如何定义coalesce带(idealPartionionNo)的参数.我正在研究一个项目,该项目是从另一位工程师传递给我的,他使用下面的计算来计算该参数的值.

// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)

val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR
Run Code Online (Sandbox Code Playgroud)

然后将其与partitioner对象一起使用:

val partitioner = new HashPartitioner(idealPartionionNo)
Run Code Online (Sandbox Code Playgroud)

但也用于:

RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)
Run Code Online (Sandbox Code Playgroud)

这是正确的方法吗?idealPartionionNo价值计算背后的主要思想是什么?什么是REPARTITION_FACTOR?我一般如何定义它?

此外,由于纱线负责确定对飞可用执行人有获得该号(的方式AVAILABLE_EXECUTOR_INSTANCES在运行),并利用它来进行计算idealPartionionNo(如更换NO_OF_EXECUTOR_INSTANCESAVAILABLE_EXECUTOR_INSTANCES)?

理想情况下,表单的一些实际示例:

  • 这是一个数据集(大小);
  • 这是RDD/DF的一些转换和可能的重用.
  • 这是你应该重新分配/合并的地方.
  • 假设您有n 执行程序,其m 核心分区因子 …

scala apache-spark rdd

16
推荐指数
3
解决办法
5165
查看次数

Spark:将大型 MySQL 表读入 DataFrame 失败

我想提前告诉您,以下几个相关问题不能解决我的问题:

这个很接近,但堆栈跟踪不同,无论如何它都没有解决。所以请放心,我在几天(失败的)解决方案搜索后发布了这个问题。


我正在尝试编写一个作业,将数据(每天一次)从MySQL表移动到Hive存储为Parquet/ORC文件的表Amazon S3。一些表非常大:~ 300M 记录大小200 GB 以上(如 所报告phpMyAdmin)。

目前我们正在sqoop为此使用,但Spark由于以下原因,我们想转向:

  • 为了利用它的功能DataFrame API(将来,我们将在移动数据的同时执行转换
  • 我们已经为组织中其他地方使用的工作编写了一个相当大的框架ScalaSpark

我已经能够在 MySQL桌子上实现这一点而没有任何问题。但是,如果我尝试一次获取超过1.5-2M 条记录,那么Spark作业(从MySQLinto读取数据DataFrame)就会失败。我在下面展示了堆栈跟踪的相关部分,您可以在此处找到完整的堆栈跟踪。

...
javax.servlet.ServletException: java.util.NoSuchElementException: None.get
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
    at …
Run Code Online (Sandbox Code Playgroud)

mysql apache-spark

7
推荐指数
1
解决办法
2921
查看次数

标签 统计

apache-spark ×2

mysql ×1

rdd ×1

scala ×1