小编Emi*_*l44的帖子

Spark 中的循环分区是如何工作的?

我很难理解 Spark 中的循环分区。考虑以下示例。我将大小为 3 的 Seq 拆分为 3 个分区:

val df = Seq(0,1,2).toDF().repartition(3)

df.explain

== Physical Plan ==
Exchange RoundRobinPartitioning(3)
+- LocalTableScan [value#42]
Run Code Online (Sandbox Code Playgroud)

现在,如果我检查分区,我会得到:

df
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_index","number_of_records")
  .show

+---------------+-----------------+
|partition_index|number_of_records|
+---------------+-----------------+
|              0|                0|
|              1|                2|
|              2|                1|
+---------------+-----------------+
Run Code Online (Sandbox Code Playgroud)

如果我对大小为 8 的 Seq 执行相同操作并将其拆分为 8 个分区,则会出现更严重的偏差:

(0 to 7).toDF().repartition(8)
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_index","number_of_records")
  .show

+---------------+-----------------+
|partition_index|number_of_records|
+---------------+-----------------+
|              0|                0|
|              1|                0|
|              2|                0|
|              3|                0|
|              4|                0| …
Run Code Online (Sandbox Code Playgroud)

scala partitioning apache-spark

5
推荐指数
1
解决办法
3100
查看次数

如何在大窗口上优化窗口聚合?

我在 Spark 2.4.4 中使用带有大窗口的窗口函数,例如。

Window
  .partitionBy("id")
  .orderBy("timestamp")
Run Code Online (Sandbox Code Playgroud)

在我的测试中,我有大约 70 个不同的 ID,但我可能有大约 200 000 行 ID。如果没有进一步的配置,我必须为我的执行器分配大量内存以避免这种 OOM:

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory, got 0
at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:161)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:128)
at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(ExternalAppendOnlyUnsafeRowArray.scala:115)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.fetchNextPartition(WindowExec.scala:345)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.next(WindowExec.scala:371)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.next(WindowExec.scala:303)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:631)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.fetchNextRow(WindowExec.scala:314)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.<init>(WindowExec.scala:323)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:303)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:302)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
Run Code Online (Sandbox Code Playgroud)

查看源代码,我发现了这个参数,根本没有记录:

spark.sql.windowExec.buffer.in.memory.threshold
Run Code Online (Sandbox Code Playgroud)

给它一个大的尺寸(例如1.000.000),我不再需要那么多的内存。据我了解,这是缓冲的行数;我想增加这个参数不会重复执行程序内存中的行,但这对我来说并不是很清楚。

有人可以准确地解释一下窗口是如何在执行器端处理的吗?为什么数据会重复?如何避免这种重复并使过程更快,每个窗口中有许多行?可以使用哪些参数?

谢谢。

scala apache-spark apache-spark-sql

5
推荐指数
1
解决办法
2415
查看次数

当我有对类型化actor系统的引用时,如何为 AkkaStreams 实例化一个物化器?

下面的代码不能编译,它说 ActorMaterializer 缺少一个隐式的 ActorRefFactory。我该如何提供?

val guardian: Behavior[Done] = Behaviors.setup(_ => {
  Behaviors.receiveMessage{
    case Done => Behaviors.stopped
  }
})
implicit val sys = ActorSystem(guardian, "sys")
implicit val materializer: Materializer = ActorMaterializer()
Run Code Online (Sandbox Code Playgroud)

scala akka akka-stream akka-typed

3
推荐指数
2
解决办法
1419
查看次数