hir*_*ryu 6 python apache-spark apache-beam
我在 Spark 集群配置和运行 Pyspark 管道方面非常有经验,但我才刚刚开始使用 Beam。因此,我正在尝试在 Spark PortableRunner(在同一个小型 Spark 集群上运行,4 个工作人员,每个工作人员具有 4 个内核和 8GB RAM)上的 Pyspark 和 Beam python SDK 之间进行逐个比较,并且我'已经决定为一个相当大的数据集进行 wordcount 作业,将结果存储在 Parquet 表中。
因此,我下载了 50GB 的 Wikipedia 文本文件,分成大约 100 个未压缩的文件,并将它们存储在目录中/mnt/nfs_drive/wiki_files/
(/mnt/nfs_drive
是安装在所有工作人员上的 NFS 驱动器)。
首先,我正在运行以下 Pyspark wordcount 脚本:
from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'
spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()
spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.map(lambda x: Row(word=x[0], count=x[1]))
spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)
该脚本运行良好,并在大约 8 分钟内在所需位置输出镶木地板文件。主要阶段(读取和拆分令牌)被划分为合理数量的任务,以便有效地使用集群:
我现在正在尝试使用 Beam 和便携式跑步机实现相同的目标。首先,我使用以下命令启动了 Spark 作业服务器(在 Spark 主节点上):
docker run --rm --net=host -e SPARK_EXECUTOR_MEMORY=8g apache/beam_spark_job_server:2.25.0 --spark-master-url=spark://localhost:7077
Run Code Online (Sandbox Code Playgroud)
然后,在主节点和工作节点上,我按如下方式运行 SDK Harness:
docker run --net=host -d --rm -v /mnt/nfs_drive:/mnt/nfs_drive apache/beam_python3.6_sdk:2.25.0 --worker_pool
Run Code Online (Sandbox Code Playgroud)
现在 Spark 集群已设置为运行 Beam 管道,我可以提交以下脚本:
import apache_beam as beam
import pyarrow
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import fileio
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=EXTERNAL",
"--environment_config=localhost:50000",
"--job_name=WordCountBeam"
])
wiki_files = '/mnt/nfs_drive/wiki_files/*'
p = beam.Pipeline(options=options)
beam_counts = (
p
| fileio.MatchFiles(wiki_files)
| beam.Map(lambda x: x.path)
| beam.io.ReadAllFromText()
| 'ExtractWords' >> beam.FlatMap(lambda x: x.split(' '))
| beam.combiners.Count.PerElement()
| beam.Map(lambda x: {'word': x[0], 'count': x[1]})
)
_ = beam_counts | 'Write' >> beam.io.WriteToParquet('/mnt/nfs_drive/beam_output',
pyarrow.schema(
[('word', pyarrow.binary()), ('count', pyarrow.int64())]
)
)
result = p.run().wait_until_finish()
Run Code Online (Sandbox Code Playgroud)
代码提交成功,我可以在 Spark UI 上看到作业并且工作人员正在执行它。但是,即使运行超过 1 小时,它也不会产生任何输出!
因此,我想确保我的设置没有问题,所以我在较小的数据集(只有 1 个 Wiki 文件)上运行了完全相同的脚本。这在大约 3.5 分钟内成功完成(同一数据集上的 Spark 字数需要 16 秒!)。
我想知道 Beam 怎么会这么慢,所以我开始查看 Beam 管道通过作业服务器提交给 Spark 的 DAG。我注意到 Spark 作业大部分时间都花在以下阶段:
打印调试行表明此任务是执行“繁重工作”(即从 wiki 文件中读取行并拆分令牌)的地方 - 然而,由于这仅发生在 2 个任务中,因此工作最多将分配给 2 个工人. 同样有趣的是,在 50GB 的大型数据集上运行会在完全相同的 DAG 上运行完全相同的任务数量。
我非常不确定如何进一步进行。似乎 Beam 管道减少了并行度,但我不确定这是否是由于作业服务器对管道的次优转换,或者我是否应该以其他方式指定我的 PTransforms 以增加 Spark 上的并行度.
任何建议表示赞赏!
我花了一段时间,但我找出了问题所在以及解决方法。
根本问题在于 Beam 的便携式运行程序,特别是 Beam 作业转换为 Spark 作业的地方。
翻译代码(由作业服务器执行)根据对 的调用将阶段拆分为任务sparkContext().defaultParallelism()
。作业服务器不会显式配置默认并行度(并且不允许用户通过管道选项进行设置),因此理论上它会根据执行器的数量配置并行度(请参阅此处的说明https:// /spark.apache.org/docs/latest/configuration.html#execution-behavior)。这似乎是调用时翻译代码的目标defaultParallelism()
。
现在,在实践中,众所周知,当依赖回退机制时,调用sparkContext().defaultParallelism()
太早可能会导致数量低于预期,因为执行器可能尚未在上下文中注册。特别是,defaultParallelism()
太早调用将给出 2 结果,并且阶段将仅分为 2 个任务。
因此,我的“肮脏黑客”解决方法包括修改作业服务器的源代码,只需在实例化之后SparkContext
和执行其他操作之前添加 3 秒的延迟:
$ git diff v2.25.0
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index aa12192..faaa4d3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -95,7 +95,13 @@ public final class SparkContextFactory {
conf.setAppName(contextOptions.getAppName());
// register immutable collections serializers because the SDK uses them.
conf.set("spark.kryo.registrator", SparkRunnerKryoRegistrator.class.getName());
- return new JavaSparkContext(conf);
+ JavaSparkContext jsc = new JavaSparkContext(conf);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ }
+ return jsc;
}
}
}
Run Code Online (Sandbox Code Playgroud)
重新编译作业服务器并通过此更改启动它后,所有调用都是在执行程序注册后defaultParallelism()
完成的,并且各个阶段很好地分为 16 个任务(与执行程序的数量相同)。正如预期的那样,现在作业的完成速度要快得多,因为有更多的工作人员在执行这项工作(但它仍然比纯 Spark 字数统计慢 3 倍)。
虽然这可行,但这当然不是一个很好的解决方案。更好的解决方案是以下之一:
在出现更好的解决方案之前,它显然会阻止在生产集群中使用 Beam Spark 作业服务器。我会将问题发布到 Beam 的票务队列中,以便可以实施更好的解决方案(希望很快)。
归档时间: |
|
查看次数: |
483 次 |
最近记录: |