使用 Python SDK 在 Spark 上运行 Apache Beam wordcount 管道时并行度低

hir*_*ryu 6 python apache-spark apache-beam

我在 Spark 集群配置和运行 Pyspark 管道方面非常有经验,但我才刚刚开始使用 Beam。因此,我正在尝试在 Spark PortableRunner(在同一个小型 Spark 集群上运行,4 个工作人员,每个工作人员具有 4 个内核和 8GB RAM)上的 Pyspark 和 Beam python SDK 之间进行逐个比较,并且我'已经决定为一个相当大的数据集进行 wordcount 作业,将结果存储在 Parquet 表中。

因此,我下载了 50GB 的 Wikipedia 文本文件,分成大约 100 个未压缩的文件,并将它们存储在目录中/mnt/nfs_drive/wiki_files//mnt/nfs_drive是安装在所有工作人员上的 NFS 驱动器)。

首先,我正在运行以下 Pyspark wordcount 脚本:

from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'

spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()

spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
    .flatMap(lambda x: x.split(' ')) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(add) \
    .map(lambda x: Row(word=x[0], count=x[1]))

spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)

该脚本运行良好,并在大约 8 分钟内在所需位置输出镶木地板文件。主要阶段(读取和拆分令牌)被划分为合理数量的任务,以便有效地使用集群: 在此处输入图片说明

我现在正在尝试使用 Beam 和便携式跑步机实现相同的目标。首先,我使用以下命令启动了 Spark 作业服务器(在 Spark 主节点上):

docker run --rm --net=host -e SPARK_EXECUTOR_MEMORY=8g apache/beam_spark_job_server:2.25.0 --spark-master-url=spark://localhost:7077
Run Code Online (Sandbox Code Playgroud)

然后,在主节点和工作节点上,我按如下方式运行 SDK Harness:

 docker run --net=host -d --rm -v /mnt/nfs_drive:/mnt/nfs_drive apache/beam_python3.6_sdk:2.25.0 --worker_pool
Run Code Online (Sandbox Code Playgroud)

现在 Spark 集群已设置为运行 Beam 管道,我可以提交以下脚本:

import apache_beam as beam
import pyarrow
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import fileio

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=EXTERNAL",
    "--environment_config=localhost:50000",
    "--job_name=WordCountBeam"
])


wiki_files = '/mnt/nfs_drive/wiki_files/*'

p = beam.Pipeline(options=options)
beam_counts = (
    p
    | fileio.MatchFiles(wiki_files)
    | beam.Map(lambda x: x.path)
    | beam.io.ReadAllFromText()
    | 'ExtractWords' >> beam.FlatMap(lambda x: x.split(' '))
    | beam.combiners.Count.PerElement()
    | beam.Map(lambda x: {'word': x[0], 'count': x[1]})
)


_ = beam_counts | 'Write' >> beam.io.WriteToParquet('/mnt/nfs_drive/beam_output',
      pyarrow.schema(
          [('word', pyarrow.binary()), ('count', pyarrow.int64())]
      )
)

result = p.run().wait_until_finish()
Run Code Online (Sandbox Code Playgroud)

代码提交成功,我可以在 Spark UI 上看到作业并且工作人员正在执行它。但是,即使运行超过 1 小时,它也不会产生任何输出!

因此,我想确保我的设置没有问题,所以我在较小的数据集(只有 1 个 Wiki 文件)上运行了完全相同的脚本。这在大约 3.5 分钟内成功完成(同一数据集上的 Spark 字数需要 16 秒!)。

我想知道 Beam 怎么会这么慢,所以我开始查看 Beam 管道通过作业服务器提交给 Spark 的 DAG。我注意到 Spark 作业大部分时间都花在以下阶段: 在此处输入图片说明

这只是分成 2 个任务,如下所示: 在此处输入图片说明

打印调试行表明此任务是执行“繁重工作”(即从 wiki 文件中读取行并拆分令牌)的地方 - 然而,由于这仅发生在 2 个任务中,因此工作最多将分配给 2 个工人. 同样有趣的是,在 50GB 的大型数据集上运行会在完全相同的 DAG 上运行完全相同的任务数量。

我非常不确定如何进一步进行。似乎 Beam 管道减少了并行度,但我不确定这是否是由于作业服务器对管道的次优转换,或者我是否应该以其他方式指定我的 PTransforms 以增加 Spark 上的并行度.

任何建议表示赞赏!

hir*_*ryu 1

我花了一段时间,但我找出了问题所在以及解决方法。

根本问题在于 Beam 的便携式运行程序,特别是 Beam 作业转换为 Spark 作业的地方。

翻译代码(由作业服务器执行)根据对 的调用将阶段拆分为任务sparkContext().defaultParallelism()。作业服务器不会显式配置默认并行度(并且不允许用户通过管道选项进行设置),因此理论上它会根据执行器的数量配置并行度(请参阅此处的说明https:// /spark.apache.org/docs/latest/configuration.html#execution-behavior)。这似乎是调用时翻译代码的目标defaultParallelism()

现在,在实践中,众所周知,当依赖回退机制时,调用sparkContext().defaultParallelism()太早可能会导致数量低于预期,因为执行器可能尚未在上下文中注册。特别是,defaultParallelism()太早调用将给出 2 结果,并且阶段将仅分为 2 个任务。

因此,我的“肮脏黑客”解决方法包括修改作业服务器的源代码,只需在实例化之后SparkContext和执行其他操作之前添加 3 秒的延迟:

$ git diff                                                                                                                                                                                                                                                                                                                         v2.25.0
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index aa12192..faaa4d3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -95,7 +95,13 @@ public final class SparkContextFactory {
       conf.setAppName(contextOptions.getAppName());
       // register immutable collections serializers because the SDK uses them.
       conf.set("spark.kryo.registrator", SparkRunnerKryoRegistrator.class.getName());
-      return new JavaSparkContext(conf);
+      JavaSparkContext jsc = new JavaSparkContext(conf);
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+      }
+      return jsc;
     }
   }
 }
Run Code Online (Sandbox Code Playgroud)

重新编译作业服务器并通过此更改启动它后,所有调用都是在执行程序注册defaultParallelism()完成的,并且各个阶段很好地分为 16 个任务(与执行程序的数量相同)。正如预期的那样,现在作业的完成速度要快得多,因为有更多的工作人员在执行这项工作(但它仍然比纯 Spark 字数统计慢 3 倍)。

虽然这可行,但这当然不是一个很好的解决方案。更好的解决方案是以下之一:

  • 更改翻译引擎,使其能够以更稳健的方式根据可用执行器的数量推断任务数量;
  • 允许用户通过管道选项配置作业服务器用于翻译作业的默认并行度(这是 Flink 可移植运行程序所做的)。

在出现更好的解决方案之前,它显然会阻止在生产集群中使用 Beam Spark 作业服务器。我会将问题发布到 Beam 的票务队列中,以便可以实施更好的解决方案(希望很快)。