如何在不重新分区的情况下并行执行Spark UDF

Maz*_*yar 3 parallel-processing user-defined-functions stanford-nlp apache-spark apache-spark-sql

我有一个小的Hive表,通过HDFS(parquet / 1152文件-超过30GB)保存了1500万行。

我正在通过科学摘要进行LDA。因此,第一步是使用StanfordNLP提取一些名词短语/块短语,我为此编写了UDF以实现此目标。

在性能方面,现在有两种情况,每种情况都有非常不同的结果。

方案1:

val hiveTable = hivecontext.sql("""
SELECT ab AS text,
          pmid AS id
  FROM scientific.medline      
    LIMIT 15000000
""")
Run Code Online (Sandbox Code Playgroud)

然后,我通过我的UDF进行呼叫hiveTable

val postagsDF = hiveTable.withColumn("words", StanfordNLP.posPhrases(col("text")))
Run Code Online (Sandbox Code Playgroud)

现在,如果我触发任何动作/转换,例如.count()或对“ postagsDF ” 执行CountVectorizer(),我将看到两个阶段。一个具有适当数量的任务(分区),另一阶段仅具有一个任务。第一个在完成一些输入/随机写入后很快结束,但是第二个仅执行一个任务需要很长时间。看来我的UDF正在此阶段执行,只有一个任务。(需要几个小时,完成后不会进行任何资源活动)

方案2:

val hiveTable = hivecontext.sql("""
SELECT ab AS text,
          pmid AS id
  FROM scientific.medline      
    LIMIT 15000000
""")
Run Code Online (Sandbox Code Playgroud)

DataFrame根据实木复合地板的数量将分区重新划分为spark检测到的确切分区数。(我可以选择其他任何数字,但该数字似乎还可以,因为我有超过500个可用的核心-每个核心2个任务)

val repartitionedDocDF = docDF.repartition(1152)
Run Code Online (Sandbox Code Playgroud)

现在通过UDF调用我的UDF hiveTable

val postagsDF = hiveTable.withColumn("words", StanfordNLP.posPhrases(col("text")))
Run Code Online (Sandbox Code Playgroud)

但是,这次的任何动作/转换都将分为四个阶段。其中两个阶段(假设计数)是1152个任务,其中两个是单个任务。我可以看到我的UDF正在其中一个阶段执行,其中1152个任务被所有执行者正确使用了整个集群。

场景1的结果: 观察我的集群,在长时间运行的单任务阶段没有太多事情发生。没有CPU使用率,内存,网络和IO活动。只有一名执行者执行一项任务,即将我的UDF应用于每个文档/列。

基准:第一种情况需要3-4个小时才能完成100万行。(我等不及要看一千五百万行需要多少钱)

方案2的结果: 查看我的集群,我可以清楚地看到我所有的资源都在被利用。我所有的节点几乎都满负荷使用。

基准测试:方案2耗时30分钟,记录了1500万行。

在此处输入图片说明

真实的问题

  1. 刚刚发生了什么?我认为默认情况下,Dataframe上的UDF将并行运行。如果分区/任务的数量大于或小于内核总数,但至少在默认的200个分区/任务上并行,则可以重新分区。我只想了解为什么我的情况下的UDf是单个任务,却忽略了默认的200和实际分区大小。(这不仅仅是性能,它是单任务作业还是多任务作业)

  2. 是否有其他方法可以使UDF在所有执行程序上并行执行而无需调用分区。我不反对重新分区,但是这是非常昂贵的操作,我认为这不是使UDF并行运行的唯一方法。即使当我重新分区到完全相同数量的分区/文件时,我仍然必须监视超过20GB的随机读取和写入。

我已经阅读了有关重新分区和UDF的所有内容,但是我找不到类似的问题,除非它进行了重新分区,否则默认情况下无法并行运行UDF。(当您将类型从int转换为bigint的简单UDF可能不可见,但是当您执行NLP时,它确实是可见的)

我的集群大小:30个节点(16core / 32G)-Spark 1.6 Cloudera CDH 5.11.1 Spark: --driver-cores 5 --driver-memory 8g --executor-cores 5 --executor-memory 5g --num-executors 116

非常感谢,

更新

我运行了没有LIMIT子句的相同代码,它在18分钟内完成了!因此,LIMIT是原因(答案中的更多内容):

在此处输入图片说明

use*_*411 5

这里的问题与LIMIT您在查询中使用的子句特别相关,与无关UDFLIMIT子句将所有结果数据重新划分为一个分区,因此不适用于大型样本。

如果要避免此问题并以某种方式减少记录数,则最好先对数据进行采样:

val p: Double = ???

spark.sql(s"SELECT * FROM df TABLESAMPLE($p percent)")
Run Code Online (Sandbox Code Playgroud)

要么:

spark.table("df").sample(false, p)
Run Code Online (Sandbox Code Playgroud)

p所需的记录分数在哪里。

请记住,使用准确数量的值进行抽样会遇到与LIMIT条款相同的问题。

  • 是的,它不是特别直观或友好的行为。当某件事情表现不佳时,对于许多人来说,第一步就是尝试较小的组合。如果spark不希望人们使用限制,则应发出警告。 (2认同)