tri*_*oid 6 scala apache-spark spark-streaming apache-spark-sql
我正在研究一个有趣的案例,该案例涉及对慢速RDD或数据集(例如,由以下代码定义的数据集)进行广泛的转换(例如,重新分区和联接):
val ds = sqlContext.createDataset(1 to 100)
.repartition(1)
.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
println(f"skewed - ${ii}")
ii
}
}
Run Code Online (Sandbox Code Playgroud)
慢速数据集很重要,因为它类似于远程数据源的视图,并且分区迭代器派生自单线程网络协议(http,jdbc等),在这种情况下,下载速度>单个速度线程处理,但是<<分布式处理的速度。
不幸的是,传统的Spark计算模型在慢速数据集上效率不高,因为我们仅限于以下选项之一:
仅使用狭窄的转换(flatMap-ish)在单个线程中以端到端的方式对流进行数据处理流,显然,数据处理将成为瓶颈,资源利用率较低。
使用广泛的操作(包括重新分区)来平衡RDD /数据集,尽管这对于并行数据处理效率至关重要,但是Spark粗粒度调度程序要求完全完成下载,这成为另一个瓶颈。
实验
以下程序代表了这种情况的简单模拟:
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
println(f"repartitioned - ${ii}")
ii
}
mapped2.foreach { _ =>
}
Run Code Online (Sandbox Code Playgroud)
当执行上述程序时,可以观察到在RDD依赖关系中println(f"repartitioned - ${ii}")不会在行之前执行该行println(f"skewed - ${ii}")。
我想指示Spark调度程序在任务完成之前(通过诸如microbatch或stream之类的机制)开始分发/运送由分区迭代器生成的数据条目。有一个简单的方法吗?例如,将慢速数据集转换为结构化流会很好,但是应该有更好集成的替代方案。
非常感谢您的意见
更新:为了使您的实验更容易,我添加了可以立即使用的scala测试:
package com.tribbloids.spookystuff.spike
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalatest.{FunSpec, Ignore}
@Ignore
class SlowRDDSpike extends FunSpec {
lazy val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
lazy val sc: SparkContext = spark.sparkContext
lazy val sqlContext: SQLContext = spark.sqlContext
import sqlContext.implicits._
describe("is repartitioning non-blocking?") {
it("dataset") {
val ds = sqlContext
.createDataset(1 to 100)
.repartition(1)
.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
println(f"skewed - $ii")
ii
}
}
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
Thread.sleep(400)
println(f"repartitioned - $ii")
ii
}
mapped2.foreach { _ =>
}
}
}
it("RDD") {
val ds = sc
.parallelize(1 to 100)
.repartition(1)
.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
println(f"skewed - $ii")
ii
}
}
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
Thread.sleep(400)
println(f"repartitioned - $ii")
ii
}
mapped2.foreach { _ =>
}
}
}
Run Code Online (Sandbox Code Playgroud)
首先感谢您的实验代码。这个问题取决于数据源(请参阅下面为什么有关数据源的信息至关重要部分)。
话虽这么说,这里的主要问题是创建更多分区,同时避免 shuffle。不幸的是,重新分区是需要 shuffle 的操作之一。
在您的示例中,您可以使用 增加分区数量而无需进行随机播放union。
var ds: Dataset[Int] = Seq[Int]().toDS()
val sequences = (1 to 100).grouped(10)
sequences.map(sequence => {
println(sequence)
sqlContext.createDataset(sequence)
}).foreach(sequenceDS => {
ds = ds.union(sequenceDS)
})
Run Code Online (Sandbox Code Playgroud)
使用联合数据集的结果: 运行时间: 24980 毫秒 分区数量: 41
没有联合,总时间是34493 ms,所以我们看到本地机器上有显着的改进。
这避免了随机播放,但会创建与给定 http 端点或数据库连接的多个连接。这是用于管理并行性的常见做法。
无需将数据集转换为流式传输,因为流式传输适用于数据集。如果您的数据源支持流式处理,您可以使用它来生成数据集,而无需从批处理转换为流式处理。如果您的数据源不支持流式传输,您可以考虑使用自定义接收器。
为什么有关数据源的信息至关重要:
完整逻辑:
it("dataset_with_union") {
val start = System.nanoTime()
var ds: Dataset[Int] = Seq[Int]().toDS()
val sequences = (1 to 100).grouped(10)
sequences.map(sequence => {
println(sequence)
sqlContext.createDataset(sequence)
}).foreach(sequenceDS => {
ds = ds.union(sequenceDS)
})
ds.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
ii
}
}
// Number of partitions here is 41
println(f"dataset number or partitions: ${ds.rdd.getNumPartitions}")
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
Thread.sleep(400)
println(f"repartitioned - $ii")
ii
}
mapped2.foreach { _ =>
}
val end = System.nanoTime()
println("Elapsed time: " + (end - start) + "ns")
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
204 次 |
| 最近记录: |