在Apache Spark中,如何将慢速的RDD /数据集转换为流?

tri*_*oid 6 scala apache-spark spark-streaming apache-spark-sql

我正在研究一个有趣的案例,该案例涉及对慢速RDD或数据集(例如,由以下代码定义的数据集)进行广泛的转换(例如,重新分区和联接):

val ds = sqlContext.createDataset(1 to 100)
  .repartition(1)
  .mapPartitions { itr =>
    itr.map { ii =>
      Thread.sleep(100)
      println(f"skewed - ${ii}")
      ii
    }
  }
Run Code Online (Sandbox Code Playgroud)

慢速数据集很重要,因为它类似于远程数据源的视图,并且分区迭代器派生自单线程网络协议(http,jdbc等),在这种情况下,下载速度>单个速度线程处理,但是<<分布式处理的速度。

不幸的是,传统的Spark计算模型在慢速数据集上效率不高,因为我们仅限于以下选项之一:

  1. 仅使用狭窄的转换(flatMap-ish)在单个线程中以端到端的方式对流进行数据处理流,显然,数据处理将成为瓶颈,资源利用率较低。

  2. 使用广泛的操作(包括重新分区)来平衡RDD /数据集,尽管这对于并行数据处理效率至关重要,但是Spark粗粒度调度程序要求完全完成下载,这成为另一个瓶颈。

实验

以下程序代表了这种情况的简单模拟:

val mapped = ds

val mapped2 = mapped
  .repartition(10)
  .map { ii =>
    println(f"repartitioned - ${ii}")
    ii
  }

mapped2.foreach { _ =>
  }
Run Code Online (Sandbox Code Playgroud)

当执行上述程序时,可以观察到在RDD依赖关系中println(f"repartitioned - ${ii}")不会在行之前执行该行println(f"skewed - ${ii}")

我想指示Spark调度程序在任务完成之前(通过诸如microbatch或stream之类的机制)开始分发/运送由分区迭代器生成的数据条目。有一个简单的方法吗?例如,将慢速数据集转换为结构化流会很好,但是应该有更好集成的替代方案。

非常感谢您的意见

更新:为了使您的实验更容易,我添加了可以立即使用的scala测试:

package com.tribbloids.spookystuff.spike

import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalatest.{FunSpec, Ignore}

@Ignore
class SlowRDDSpike extends FunSpec {

  lazy val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()

  lazy val sc: SparkContext = spark.sparkContext
  lazy val sqlContext: SQLContext = spark.sqlContext

  import sqlContext.implicits._

  describe("is repartitioning non-blocking?") {

    it("dataset") {

      val ds = sqlContext
        .createDataset(1 to 100)
        .repartition(1)
        .mapPartitions { itr =>
          itr.map { ii =>
            Thread.sleep(100)
            println(f"skewed - $ii")
            ii
          }
        }

      val mapped = ds

      val mapped2 = mapped
        .repartition(10)
        .map { ii =>
          Thread.sleep(400)
          println(f"repartitioned - $ii")
          ii
        }

      mapped2.foreach { _ =>
        }
    }
  }

  it("RDD") {
    val ds = sc
      .parallelize(1 to 100)
      .repartition(1)
      .mapPartitions { itr =>
        itr.map { ii =>
          Thread.sleep(100)
          println(f"skewed - $ii")
          ii
        }
      }

    val mapped = ds

    val mapped2 = mapped
      .repartition(10)
      .map { ii =>
        Thread.sleep(400)
        println(f"repartitioned - $ii")
        ii
      }

    mapped2.foreach { _ =>
      }

  }
}
Run Code Online (Sandbox Code Playgroud)

Yos*_*ari 1

首先感谢您的实验代码。这个问题取决于数据源(请参阅下面为什么有关数据源的信息至关重要部分)。

话虽这么说,这里的主要问题是创建更多分区,同时避免 shuffle不幸的是,重新分区是需要 shuffle 的操作之一。

在您的示例中,您可以使用 增加分区数量而无需进行随机播放union

var ds: Dataset[Int] = Seq[Int]().toDS()
val sequences = (1 to 100).grouped(10)
sequences.map(sequence => {
  println(sequence)
  sqlContext.createDataset(sequence)
}).foreach(sequenceDS =>  {
  ds = ds.union(sequenceDS)
})
Run Code Online (Sandbox Code Playgroud)

使用联合数据集的结果: 运行时间: 24980 毫秒 分区数量: 41

没有联合,总时间是34493 ms,所以我们看到本地机器上有显着的改进。

这避免了随机播放,但会创建与给定 http 端点或数据库连接的多个连接。这是用于管理并行性的常见做法。

无需将数据集转换为流式传输,因为流式传输适用于数据集。如果您的数据源支持流式处理,您可以使用它来生成数据集,而无需从批处理转换为流式处理。如果您的数据源不支持流式传输,您可以考虑使用自定义接收器


为什么有关数据源的信息至关重要:

  1. 从给定数据源读取数据时,您可以控制初始数据集的分区数量吗?
  2. 您的数据源可接受的请求率或连接数是多少?
  3. 涉及多少数据?随机播放是一种选择吗?
  4. 您的数据源支持 Spark Streaming 吗?一些数据源(kinesis、Kafka、文件系统、ElasticSearch)支持流式传输,有些则不支持。

完整逻辑:

  it("dataset_with_union") {
    val start = System.nanoTime()
    var ds: Dataset[Int] = Seq[Int]().toDS()
    val sequences = (1 to 100).grouped(10)
    sequences.map(sequence => {
      println(sequence)
      sqlContext.createDataset(sequence)
    }).foreach(sequenceDS =>  {
      ds = ds.union(sequenceDS)
    })

    ds.mapPartitions { itr =>
      itr.map { ii =>
        Thread.sleep(100)
        ii
      }
    }

    // Number of partitions here is 41
    println(f"dataset number or partitions: ${ds.rdd.getNumPartitions}")
    val mapped = ds

    val mapped2 = mapped
      .repartition(10)
      .map { ii =>
        Thread.sleep(400)
        println(f"repartitioned - $ii")
        ii
      }

    mapped2.foreach { _ =>
    }

    val end = System.nanoTime()
    println("Elapsed time: " + (end - start) + "ns")
  }
Run Code Online (Sandbox Code Playgroud)