数量重新分配逻辑 - 具有外部数据集的 MapGroups

Mic*_*ael 5 algorithm functional-programming scala apache-spark

我正在研究一个复杂的逻辑,我需要将数量从一个数据集重新分配到另一个数据集。

在示例中,我们有OwnerInvoice- 我们需要从Invoice精确Owner匹配中减去数量(在给定汽车的给定邮政编码处)。减去的数量需要重新分配回出现同一辆车的另一个​​邮政编码。复杂性发生在我们应该避免分发到邮政编码时,同一辆车出现在另一个 pcode 的 Invoice 表中。

最后,如果减法或重新分配产生负值,我们应该避免对给定的 进行这种转换Invoice

这是一个带有数字的示例

在此处输入图片说明

下面是代码版本,但不幸的是它没有按预期工作。更具体地说,我不知道如何跳过给定汽车的发票中多次出现的记录。在第一个示例(红色)中,我不知道如何跳过记录 Owner(A, 888, 100)。

package playground

import org.apache.spark.sql.SparkSession


object basic extends App {
  val spark = SparkSession
    .builder()
    .appName("Sample app")
    .master("local")
    .getOrCreate()

  import spark.implicits._

  final case class Owner(car: String, pcode: String, qtty: Double)
  final case class Invoice(car: String, pcode: String, qtty: Double)

  val sc = spark.sparkContext

  val data = Seq(
    Owner("A", "666", 80),
    Owner("B", "555", 20),
    Owner("A", "444", 50),
    Owner("A", "222", 20),
    Owner("C", "444", 20),
    Owner("C", "666", 80),
    Owner("C", "555", 120),
    Owner("A", "888", 100)
  )

  val fleet = Seq(
    Invoice("A", "666", 15),
    Invoice("C", "444", 10),
    Invoice("A", "888", 12),
    Invoice("B", "555", 200)
  )

  val owners = spark.createDataset(data)
  val invoices = spark.createDataset(fleet)

  val actual = owners
    .joinWith(invoices, owners("Car") === invoices("Car"), joinType = "right")
    .groupByKey(_._2)
    .flatMapGroups {
      case (invoice, group) =>
        val subOwner: Vector[Owner] = group.toVector.map(_._1)
        val householdToBeInvoiced: Vector[Owner] =
          subOwner.filter(_.pcode == invoice.pcode)
        val modifiedOwner: Vector[Owner] = if (householdToBeInvoiced.nonEmpty) {
          // negative compensation (remove the quantity from Invoice for the exact match)
          val neg: Owner = householdToBeInvoiced.head
          val calculatedNeg: Owner = neg.copy(qtty = neg.qtty - invoice.qtty)

          // positive compensation (redistribute the "removed" quantity proportionally but not for pcode existing in
          // invoice for the same car
          val pos = subOwner.filter(s => s.pcode != invoice.pcode)
          val totalQuantityOwner = pos.map(_.qtty).sum
          val calculatedPos: Vector[Owner] =
            pos.map(
              c =>
                c.copy(
                  qtty = c.qtty + invoice.qtty * c.qtty / (totalQuantityOwner - neg.qtty)
              )
            )

          (calculatedPos :+ calculatedNeg)
        } else {
          subOwner
        }

        modifiedOwner
    }
}
Run Code Online (Sandbox Code Playgroud)

此代码产生

+---+-----+------------------+
|car|pcode|              qtty|
+---+-----+------------------+
|  A|  888|116.66666666666667|
|  A|  222|23.333333333333332|
|  A|  444|58.333333333333336|
|  A|  666|              65.0|
|  C|  555|126.66666666666667|
|  C|  666| 84.44444444444444|
|  C|  444|              10.0|
|  B|  555|            -180.0|
|  A|  222|              24.8|
|  A|  444|              62.0|
|  A|  666|              99.2|
|  A|  888|              88.0|
+---+-----+------------------+
Run Code Online (Sandbox Code Playgroud)

任何支持将不胜感激!谢谢


在对这个问题进行更多思考之后,我设法改进了代码,但我仍然无法使用迭代方法(使用先前的计算来计算下一个,例如获取红色记录的结果以生成蓝色记录等。 )

package playground

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}

object basic extends App {

  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)

  val spark = SparkSession
    .builder()
    .appName("Spark Optimization Playground")
    .master("local")
    .getOrCreate()

  import spark.implicits._

  final case class Owner(car: String, pcode: String, qtty: Double)
  final case class Invoice(car: String, pcode: String, qtty: Double)

  val data = Seq(
    Owner("A", "666", 80),
    Owner("B", "555", 20),
    Owner("A", "444", 50),
    Owner("A", "222", 20),
    Owner("C", "444", 20),
    Owner("C", "666", 80),
    Owner("C", "555", 120),
    Owner("A", "888", 100)
  )

  val fleet = Seq(
    Invoice("A", "666", 15),
    Invoice("C", "444", 10),
    Invoice("A", "888", 12),
    Invoice("B", "555", 200)
  )

  val owners = spark.createDataset(data)
  val invoices = spark.createDataset(fleet)

  val secondFleets = invoices.map(identity)

  val fleetPerCar =
    invoices
      .joinWith(secondFleets, invoices("car") === secondFleets("car"), "inner")
      .groupByKey(_._1)
      .flatMapGroups {
        case (value, iter) ? Iterator((value, iter.toArray))
      }

  val gb
    : KeyValueGroupedDataset[(Invoice, Array[(Invoice, Invoice)]),
                             (Owner, (Invoice, Array[(Invoice, Invoice)]))] =
    owners
      .joinWith(fleetPerCar, owners("car") === fleetPerCar("_1.car"), "right")
      .groupByKey(_._2)

  val x: Dataset[Owner] =
    gb.flatMapGroups {
      case (fleet, group) =>
        val subOwner: Vector[Owner] = group.toVector.map(_._1)
        val householdToBeInvoiced: Vector[Owner] =
          subOwner.filter(_.pcode == fleet._1.pcode)
        val modifiedOwner: Vector[Owner] = if (householdToBeInvoiced.nonEmpty) {
          // negative compensation (remove the quantity from Invoice for the exact match)
          val neg: Owner = householdToBeInvoiced.head
          val calculatedNeg: Owner = neg.copy(qtty = neg.qtty - fleet._1.qtty)

          // positive compensation (redistribute the "removed" quantity proportionally but not for pcode existing in
          // invoice for the same car
          val otherPCode =
            fleet._2.filter(_._2.pcode != fleet._1.pcode).map(_._2.pcode)

          val pos = subOwner.filter(
            s => s.pcode != fleet._1.pcode && !otherPCode.contains(s.pcode)
          )
          val totalQuantityOwner = pos.map(_.qtty).sum + neg.qtty
          val calculatedPos: Vector[Owner] =
            pos.map(
              c =>
                c.copy(
                  qtty = c.qtty + fleet._1.qtty * c.qtty / (totalQuantityOwner - neg.qtty)
              )
            )
          // if pos or neg compensation produce negative quantity, skip the computation
          val res = (calculatedPos :+ calculatedNeg)
          if (res.exists(_.qtty < 0)) {
            subOwner
          } else {
            res
          }
        } else {
          subOwner
        }

        modifiedOwner
    }
  x.show()
}
Run Code Online (Sandbox Code Playgroud)

Che*_*ema 2

第一个解决方案基于Spark DatasetsSparkSQL并提供了预期结果。

\n

有很多方法可以配置这种方法,甚至考虑到性能问题,这可能会在稍后讨论。

\n
import org.apache.spark.sql.SparkSession\nimport org.apache.log4j.{Level, Logger}\n\nobject basic {\n\n  val spark = SparkSession\n    .builder()\n    .appName("Sample app")\n    .master("local")\n    .config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data\n    .getOrCreate()\n\n  val sc = spark.sparkContext\n\n  case class Owner(car: String, pcode: String, qtty: Double)\n  case class Invoice(car: String, pcode: String, qtty: Double)\n\n  def main(args: Array[String]): Unit = {\n\n    val data = Seq(\n      Owner("A", "666", 80),\n      Owner("B", "555", 20),\n      Owner("A", "444", 50),\n      Owner("A", "222", 20),\n      Owner("C", "444", 20),\n      Owner("C", "666", 80),\n      Owner("C", "555", 120),\n      Owner("A", "888", 100)\n    )\n\n    val fleet = Seq(\n      Invoice("A", "666", 15),\n      Invoice("C", "666", 10),\n      Invoice("A", "888", 12),\n      Invoice("B", "555", 200)\n    )\n\n    val expected = Seq(\n      Owner("A", "666", 65),\n      Owner("B", "555", 20), // not redistributed because produce a negative value\n      Owner("A", "444", 69.29),\n      Owner("A", "222", 27.71),\n      Owner("C", "444", 21.43),\n      Owner("C", "666", 70),\n      Owner("C", "555", 128.57),\n      Owner("A", "888", 88)\n    )\n\n    Logger.getRootLogger.setLevel(Level.ERROR)\n\n    try {\n      import spark.implicits._\n\n      val owners = spark.createDataset(data).as[Owner].cache()\n      val invoices = spark.createDataset(fleet).as[Invoice].cache()\n\n      owners.createOrReplaceTempView("owners")\n      invoices.createOrReplaceTempView("invoices")\n\n      /**\n        * this part fetch car and pcode from owner with the substracted quantity from invoice\n        */\n      val p1 = spark.sql(\n        """SELECT i.car,i.pcode,\n          |CASE WHEN (o.qtty - i.qtty) < 0 THEN o.qtty ELSE (o.qtty - i.qtty) END AS qtty,\n          |CASE WHEN (o.qtty - i.qtty) < 0 THEN 0 ELSE i.qtty END AS to_distribute\n          |FROM owners o\n          |INNER JOIN invoices i  ON(i.car = o.car AND i.pcode = o.pcode)\n          |""".stripMargin)\n        .cache()\n      p1.createOrReplaceTempView("p1")\n\n      /**\n        * this part fetch all the car and pcode that we have to redistribute their quantity\n        */\n      val p2 = spark.sql(\n        """SELECT o.car, o.pcode, o.qtty\n          |FROM owners o\n          |LEFT OUTER JOIN invoices i  ON(i.car = o.car AND i.pcode = o.pcode)\n          |WHERE i.car IS NULL\n          |""".stripMargin)\n        .cache()\n      p2.createOrReplaceTempView("p2")\n\n      /**\n        * this part fetch the quantity to distribute\n        */\n      val distribute = spark.sql(\n        """\n          |SELECT car, SUM(to_distribute) AS to_distribute\n          |FROM p1\n          |GROUP BY car\n          |""".stripMargin)\n        .cache()\n      distribute.createOrReplaceTempView("distribute")\n\n      /**\n        * this part fetch the proportion to distribute proportionally\n        */\n      val proportion = spark.sql(\n        """\n          |SELECT car, SUM(qtty) AS proportion\n          |FROM p2\n          |GROUP BY car\n          |""".stripMargin)\n          .cache()\n      proportion.createOrReplaceTempView("proportion")\n\n\n      /**\n        * this part join p1 and p2 with the distribution calculated\n        */\n      val result = spark.sql(\n        """\n          |SELECT p2.car, p2.pcode, ROUND(((to_distribute / proportion) * qtty) + qtty, 2) AS qtty\n          |FROM p2\n          |JOIN distribute d ON(p2.car = d.car)\n          |JOIN proportion p ON(d.car = p.car)\n          |UNION ALL\n          |SELECT car, pcode, qtty\n          |FROM p1\n          |""".stripMargin)\n\n      result.show(truncate = false)\n/*\n+---+-----+------+\n|car|pcode|qtty  |\n+---+-----+------+\n|A  |444  |69.29 |\n|A  |222  |27.71 |\n|C  |444  |21.43 |\n|C  |555  |128.57|\n|A  |666  |65.0  |\n|B  |555  |20.0  |\n|C  |666  |70.0  |\n|A  |888  |88.0  |\n+---+-----+------+\n*/\n\n      expected\n        .toDF("car","pcode","qtty")\n        .show(truncate = false)\n/*\n+---+-----+------+\n|car|pcode|qtty  |\n+---+-----+------+\n|A  |666  |65.0  |\n|B  |555  |20.0  |\n|A  |444  |69.29 |\n|A  |222  |27.71 |\n|C  |444  |21.43 |\n|C  |666  |70.0  |\n|C  |555  |128.57|\n|A  |888  |88.0  |\n+---+-----+------+\n*/\n\n    } finally {\n      sc.stop()\n      println("SparkContext stopped")\n      spark.stop()\n      println("SparkSession stopped")\n    }\n  }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

使用 API 数据集

\n

解决此问题的另一种具有相同结果的方法是使用Datasetsand its Great API,作为示例:

\n
import org.apache.spark.sql.SparkSession\nimport org.apache.log4j.{Level, Logger}\nimport org.apache.spark.sql.functions._\nimport org.apache.spark.storage.StorageLevel\n\nobject basic2 {\n\n  val spark = SparkSession\n    .builder()\n    .appName("Sample app")\n    .master("local")\n    .config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data\n    .getOrCreate()\n\n  val sc = spark.sparkContext\n\n  final case class Owner(car: String, pcode: String, o_qtty: Double)\n  final case class Invoice(car: String, pcode: String, i_qtty: Double)\n\n  def main(args: Array[String]): Unit = {\n\n    val data = Seq(\n      Owner("A", "666", 80),\n      Owner("B", "555", 20),\n      Owner("A", "444", 50),\n      Owner("A", "222", 20),\n      Owner("C", "444", 20),\n      Owner("C", "666", 80),\n      Owner("C", "555", 120),\n      Owner("A", "888", 100)\n    )\n\n    val fleet = Seq(\n      Invoice("A", "666", 15),\n      Invoice("C", "666", 10),\n      Invoice("A", "888", 12),\n      Invoice("B", "555", 200)\n    )\n\n    val expected = Seq(\n      Owner("A", "666", 65),\n      Owner("B", "555", 20), // not redistributed because produce a negative value\n      Owner("A", "444", 69.29),\n      Owner("A", "222", 27.71),\n      Owner("C", "444", 21.43),\n      Owner("C", "666", 70),\n      Owner("C", "555", 128.57),\n      Owner("A", "888", 88)\n    )\n\n    Logger.getRootLogger.setLevel(Level.ERROR)\n\n    try {\n      import spark.implicits._\n\n      val owners = spark.createDataset(data)\n        .as[Owner]\n        .cache()\n\n      val invoices = spark.createDataset(fleet)\n        .as[Invoice]\n        .cache()\n\n      val p1 = owners\n        .join(invoices,Seq("car","pcode"),"inner")\n        .selectExpr("car","pcode","IF(o_qtty-i_qtty < 0,o_qtty,o_qtty - i_qtty) AS qtty","IF(o_qtty-i_qtty < 0,0,i_qtty) AS to_distribute")\n        .persist(StorageLevel.MEMORY_ONLY)\n\n      val p2 = owners\n        .join(invoices,Seq("car","pcode"),"left_outer")\n        .filter(row => row.anyNull == true)\n        .drop(col("i_qtty"))\n        .withColumnRenamed("o_qtty","qtty")\n        .persist(StorageLevel.MEMORY_ONLY)\n\n      val distribute = p1\n        .groupBy(col("car"))\n        .agg(sum(col("to_distribute")).as("to_distribute"))\n        .persist(StorageLevel.MEMORY_ONLY)\n\n      val proportion = p2\n          .groupBy(col("car"))\n          .agg(sum(col("qtty")).as("proportion"))\n          .persist(StorageLevel.MEMORY_ONLY)\n\n      val result = p2\n        .join(distribute, "car")\n        .join(proportion, "car")\n        .withColumn("qtty",round( ((col("to_distribute") / col("proportion")) * col("qtty")) + col("qtty"), 2 ))\n        .drop("to_distribute","proportion")\n        .union(p1.drop("to_distribute"))\n\n      result.show()\n/*\n+---+-----+------+\n|car|pcode|  qtty|\n+---+-----+------+\n|  A|  444| 69.29|\n|  A|  222| 27.71|\n|  C|  444| 21.43|\n|  C|  555|128.57|\n|  A|  666|  65.0|\n|  B|  555|  20.0|\n|  C|  666|  70.0|\n|  A|  888|  88.0|\n+---+-----+------+\n*/\n\n      expected\n        .toDF("car","pcode","qtty")\n        .show(truncate = false)\n/*\n+---+-----+------+\n|car|pcode|qtty  |\n+---+-----+------+\n|A  |666  |65.0  |\n|B  |555  |20.0  |\n|A  |444  |69.29 |\n|A  |222  |27.71 |\n|C  |444  |21.43 |\n|C  |666  |70.0  |\n|C  |555  |128.57|\n|A  |888  |88.0  |\n+---+-----+------+\n*/\n\n    } finally {\n      sc.stop()\n      println("SparkContext stopped")\n      spark.stop()\n      println("SparkSession stopped")\n    }\n  }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

关于性能和调优的一些一般注意事项。

\n

它始终取决于您的特定用例,但总的来说,首先,如果您可以过滤和清理数据,您可能会看到一些改进。

\n

使用高级声明式 API 的一个重点是将自己与低级实现细节隔离开来。\n优化是Catalyst 优化器的工作。\n它是一个复杂的引擎,我真的怀疑有人可以在不深入研究的情况下轻松改进它更深入地了解其内部。

\n

默认分区数\n属性:spark.sql.shuffle.partitions,正确设置。

\n

默认情况下, \nSpark SQL使用spark.sql.shuffle.partitions分区数进行聚合和连接,即默认为 200。\n这通常会导致分区爆炸,而不会影响\n查询的性能,因为\n这 200 个任务(每个分区)\n都需要启动和在得到结果之前完成。

\n

考虑您的查询真正需要多少个分区。

\n

Spark只能为 的每个分区运行 1 个并发任务RDD,最多可达集群中的核心数量。\n因此,如果您的集群有 50 个核心,您希望 RDD 至少有 50 个分区。\n至于选择“良好”的分区数量,您通常需要至少与并行执行器的数量一样多。\n您可以通过调用获得此计算值

\n

sc.defaultParallelism

\n

或检查 RDD 分区数

\n

df.rdd.partitions.size

\n

重新分区:增加分区,过滤器增加并行性后重新平衡分区repartition(numPartitions: Int)

\n

Coalesce:在输出到 HDFS/外部之前减少分区而不进行 shuffle 合并coalesce(numPartitions: Int, suffle: Boolean = false)

\n

您可以点击此链接:使用合并和重新分区管理 Spark 分区

\n

缓存数据以避免重新计算dataFrame.cache()

\n

分析器 \xe2\x80\x94 逻辑查询计划分析器

\n

分析器是 Spark SQL 中的逻辑查询计划分析器,可在语义上\n验证未解析的逻辑计划并将其转换为已分析的逻辑计划。

\n

您可以使用解释访问数据集的分析逻辑计划(启用\n扩展标志)

\n
dataframe.explain(extended = true)\n
Run Code Online (Sandbox Code Playgroud)\n

有关更多性能选项,请参阅文档:\n性能调优

\n

调整 Spark 进程的可能性有很多,但这始终取决于您的用例。

\n

批处理还是流处理?数据帧还是普通 RDD?蜂巢还是不蜂巢?数据是否被打乱?等等......

\n

我强烈推荐您Jacek Laskowski《Spark SQL 内部原理》

\n

最后,您必须使用不同的值和基准进行一些试验,以了解数据样本的处理时间。

\n
  val start = System.nanoTime()\n\n  // my process\n\n  val end = System.nanoTime()\n\n  val time = end - start\n  println(s"My App takes: $time")\n
Run Code Online (Sandbox Code Playgroud)\n

希望这可以帮助。

\n

  • @Michael 您好,使用 Dataset 及其出色的 API 可以得到相同的结果。我给你举个例子。另一方面,为了进行迭代过程,我会使用普通 RDD,但我没有看到这种方法对于这个问题的优势。 (2认同)