我知道在Spark中我可以通过使用多个分区来分割我的计算.如果说我可以将输入RDD拆分为1000个分区并且我的机器数量为100,那么Spark会将计算分成1000个任务,并以某种智能方式将它们动态分配到我的100台机器中.
现在假设我最初可以将数据拆分为仅2个分区,但我仍然有100台机器.当然,我的98台机器将闲置.但是当我处理每个任务时,我可能会将其拆分为可能在不同机器上执行的子任务.它可以在带有队列的普通Java中轻松实现,但我不确定在Apache Spark中攻击它的最佳方法是什么.
考虑以下Java伪代码:
BlockingQueue<Task> q = new LinkedBlockingQueue<Task>();
q.push(myInitialTask);
...
//On each thread:
while (!queue.isEmpty()) {
Task nextTask = queue.take();
List<Task> newTasks = process_task_and_split_to_sub_tasks(nextTask);
queue.pushAll(newTasks);
}
Run Code Online (Sandbox Code Playgroud)
假设方法' process_task_and_split_to_sub_tasks() '可以将任何大型任务拆分为多个较小的任务,上述Java代码将使我的所有100个线程保持忙碌.
有没有办法在Spark中实现相同的功能,可能与其他工具结合使用?
更新:已经正确地指出,攻击它的方法之一就是
我想这是解决这个问题的"经典"方法,但它要求我能够正确估计每个键的工作量以正确分区.如果我没有提前知道每个密钥的工作量怎么办?当我的大部分机器都闲置等待一些不幸的机器时,我可能最终会遇到非常不幸的分区.
示例:我们以简化的频繁项集挖掘为例.
假设我的文件包含带有字母a到j(10个字母)的行,每行中的所有字母都按字母顺序排序而不重复,例如'abcf',任务是找到所有行中50%存在的所有字母组合.例如,如果许多行匹配模式'ab.*f',那么输出将包含{'a','b','f','ab','af','bf','abf'}.
实现它的方法之一是将所有以'a'开头的行发送到一个映射器(机器),所有行以'b'开头到另一个映射器.顺便说一下,这是在Spark中实现频繁模式挖掘的方式.现在假设我有100台机器(但只有10个字母).然后我的90台机器将闲置.
使用更精细的密钥解决方案,我可以生成10,000个4个字母的前缀,然后根据每个前缀的估计工作以某种方式对它们进行分区.但是我的分区可能非常错误:如果大多数行以'abcd'开头,那么所有的工作都将由负责此前缀的机器完成(除了它之外可能还有其他前缀),再次产生一个当我的大多数机器闲置等待一些不幸的机器时的情况.
在这种情况下,动态负载平衡将是这样的:接收到以"a"开头的行的映射器可能希望进一步分割其行 - 以'ab','ac','ad'开头, ...然后将它们发送给其他10台机器,这些机器可能会决定将其工作进一步分解为更多任务.
我知道标准的Apache Spark没有开箱即用的答案,但我想知道是否有办法实现这一目标.
Kafka(即上面的队列)+ Spark Streaming看起来很有前途,您认为我能够以相对简单的方式使用这些工具来实现动态负载平衡吗?你能推荐其他工具吗?
我很难理解 Spark 中的循环分区。考虑以下示例。我将大小为 3 的 Seq 拆分为 3 个分区:
val df = Seq(0,1,2).toDF().repartition(3)
df.explain
== Physical Plan ==
Exchange RoundRobinPartitioning(3)
+- LocalTableScan [value#42]
Run Code Online (Sandbox Code Playgroud)
现在,如果我检查分区,我会得到:
df
.rdd
.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
.toDF("partition_index","number_of_records")
.show
+---------------+-----------------+
|partition_index|number_of_records|
+---------------+-----------------+
| 0| 0|
| 1| 2|
| 2| 1|
+---------------+-----------------+
Run Code Online (Sandbox Code Playgroud)
如果我对大小为 8 的 Seq 执行相同操作并将其拆分为 8 个分区,则会出现更严重的偏差:
(0 to 7).toDF().repartition(8)
.rdd
.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
.toDF("partition_index","number_of_records")
.show
+---------------+-----------------+
|partition_index|number_of_records|
+---------------+-----------------+
| 0| 0|
| 1| 0|
| 2| 0|
| 3| 0|
| 4| 0| …Run Code Online (Sandbox Code Playgroud) 假设我有一个带有列的DataFrame partition_id:
n_partitions = 2
df = spark.sparkContext.parallelize([
[1, 'A'],
[1, 'B'],
[2, 'A'],
[2, 'C']
]).toDF(('partition_id', 'val'))
Run Code Online (Sandbox Code Playgroud)
我如何重新分区DataFrame以保证每个值partition_id都转到一个单独的分区,并且实际分区的数量与不同的值完全相同partition_id?
如果我执行散列分区,即df.repartition(n_partitions, 'partition_id')保证分区数量正确,但某些分区可能为空,而其他分区可能包含多个partition_id由于散列冲突引起的值.
我正在使用Spark Streaming v2.0.0从Kafka检索日志并进行一些操作。我正在使用该功能mapWithState以保存和更新与设备相关的某些字段。我想知道此功能如何在群集中工作。确实,到目前为止,我只是使用独立模式,但稍后将在Yarn群集中尝试使用它。
但是,假设我有一个包含多个节点的群集,如果一个节点更新了设备的状态,他是否会立即将此更新通知所有其他节点?如果否,则mapWithState需要设置集群功能。我该怎么办呢?
我的RDD为36个元素.我有一个3个节点的集群,每个节点有4个核心.我已经将RDD重新划分为36个部分,以便每个分区可能有一个要处理的元素,但是整个36个元素被分区,这样只有4个部分每个有9个元素,其余的部分都是空的,因此无需处理和服务器资源未得到充分利用.
如何重新分区数据以确保每个部分都有一些数据需要处理?如何确保每个零件都有3个要处理的元素?