Maz*_*yar 3 parallel-processing user-defined-functions stanford-nlp apache-spark apache-spark-sql
我有一个小的Hive表,通过HDFS(parquet / 1152文件-超过30GB)保存了1500万行。
我正在通过科学摘要进行LDA。因此,第一步是使用StanfordNLP提取一些名词短语/块短语,我为此编写了UDF以实现此目标。
在性能方面,现在有两种情况,每种情况都有非常不同的结果。
方案1:
val hiveTable = hivecontext.sql("""
SELECT ab AS text,
pmid AS id
FROM scientific.medline
LIMIT 15000000
""")
Run Code Online (Sandbox Code Playgroud)
然后,我通过我的UDF进行呼叫hiveTable:
val postagsDF = hiveTable.withColumn("words", StanfordNLP.posPhrases(col("text")))
Run Code Online (Sandbox Code Playgroud)
现在,如果我触发任何动作/转换,例如.count()或对“ postagsDF ” 执行CountVectorizer(),我将看到两个阶段。一个具有适当数量的任务(分区),另一阶段仅具有一个任务。第一个在完成一些输入/随机写入后很快结束,但是第二个仅执行一个任务需要很长时间。看来我的UDF正在此阶段执行,只有一个任务。(需要几个小时,完成后不会进行任何资源活动)
方案2:
val hiveTable = hivecontext.sql("""
SELECT ab AS text,
pmid AS id
FROM scientific.medline
LIMIT 15000000
""")
Run Code Online (Sandbox Code Playgroud)
我DataFrame根据实木复合地板的数量将分区重新划分为spark检测到的确切分区数。(我可以选择其他任何数字,但该数字似乎还可以,因为我有超过500个可用的核心-每个核心2个任务)
val repartitionedDocDF = docDF.repartition(1152)
Run Code Online (Sandbox Code Playgroud)
现在通过UDF调用我的UDF hiveTable:
val postagsDF = hiveTable.withColumn("words", StanfordNLP.posPhrases(col("text")))
Run Code Online (Sandbox Code Playgroud)
但是,这次的任何动作/转换都将分为四个阶段。其中两个阶段(假设计数)是1152个任务,其中两个是单个任务。我可以看到我的UDF正在其中一个阶段执行,其中1152个任务被所有执行者正确使用了整个集群。
场景1的结果: 观察我的集群,在长时间运行的单任务阶段没有太多事情发生。没有CPU使用率,内存,网络和IO活动。只有一名执行者执行一项任务,即将我的UDF应用于每个文档/列。
基准:第一种情况需要3-4个小时才能完成100万行。(我等不及要看一千五百万行需要多少钱)
方案2的结果: 查看我的集群,我可以清楚地看到我所有的资源都在被利用。我所有的节点几乎都满负荷使用。
基准测试:方案2耗时30分钟,记录了1500万行。
真实的问题
刚刚发生了什么?我认为默认情况下,Dataframe上的UDF将并行运行。如果分区/任务的数量大于或小于内核总数,但至少在默认的200个分区/任务上并行,则可以重新分区。我只想了解为什么我的情况下的UDf是单个任务,却忽略了默认的200和实际分区大小。(这不仅仅是性能,它是单任务作业还是多任务作业)
是否有其他方法可以使UDF在所有执行程序上并行执行而无需调用分区。我不反对重新分区,但是这是非常昂贵的操作,我认为这不是使UDF并行运行的唯一方法。即使当我重新分区到完全相同数量的分区/文件时,我仍然必须监视超过20GB的随机读取和写入。
我已经阅读了有关重新分区和UDF的所有内容,但是我找不到类似的问题,除非它进行了重新分区,否则默认情况下无法并行运行UDF。(当您将类型从int转换为bigint的简单UDF可能不可见,但是当您执行NLP时,它确实是可见的)
我的集群大小:30个节点(16core / 32G)-Spark 1.6 Cloudera CDH 5.11.1 Spark: --driver-cores 5 --driver-memory 8g --executor-cores 5 --executor-memory 5g --num-executors 116
非常感谢,
更新:
我运行了没有LIMIT子句的相同代码,它在18分钟内完成了!因此,LIMIT是原因(答案中的更多内容):
这里的问题与LIMIT您在查询中使用的子句特别相关,与无关UDF。LIMIT子句将所有结果数据重新划分为一个分区,因此不适用于大型样本。
如果要避免此问题并以某种方式减少记录数,则最好先对数据进行采样:
val p: Double = ???
spark.sql(s"SELECT * FROM df TABLESAMPLE($p percent)")
Run Code Online (Sandbox Code Playgroud)
要么:
spark.table("df").sample(false, p)
Run Code Online (Sandbox Code Playgroud)
p所需的记录分数在哪里。
请记住,使用准确数量的值进行抽样会遇到与LIMIT条款相同的问题。