ℕʘʘ*_*ḆḽḘ 5 r apache-spark apache-spark-ml apache-spark-mllib sparklyr
考虑这个使用的简单示例sparklyr:
library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
# Source: table<mytext_spark> [?? x 3]
# Database: spark_connection
text book label
<chr> <chr> <int>
1 SENSE AND SENSIBILITY Sense & Sensibility 0
2 "" Sense & Sensibility 0
3 by Jane Austen Sense & Sensibility 0
4 "" Sense & Sensibility 0
5 (1811) Sense & Sensibility 0
6 "" Sense & Sensibility 0
7 "" Sense & Sensibility 0
8 "" Sense & Sensibility 0
9 "" Sense & Sensibility 0
10 CHAPTER 1 Sense & Sensibility 0
11 "" Sense & Sensibility 0
12 "" Sense & Sensibility 0
13 The family of Dashwood had long been settled in Sussex. Their estate Sense & Sensibility 0
14 was large, and their residence was at Norland Park, in the centre of Sense & Sensibility 0
15 their property, where, for many generations, they had lived in so Sense & Sensibility 0
16 respectable a manner as to engage the general good opinion of their Sense & Sensibility 0
Run Code Online (Sandbox Code Playgroud)
数据帧的大小相当小(大约70k行和14k唯一的单词).
现在,naive bayes在我的群集上训练模型只需几秒钟.首先,我定义了pipeline
pipeline <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_naive_bayes( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
model_type = "multinomial",
smoothing = 0,
thresholds = c(1, 1))
Run Code Online (Sandbox Code Playgroud)
然后训练naive bayes模型
> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
expr min lq mean median uq max neval
model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832 3
Run Code Online (Sandbox Code Playgroud)
现在的问题是,试图运行任何tree基于模型(random forest,boosted trees在相同的(实际上是很小的!!)数据集等)将无法正常工作.
pipeline2 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_gbt_classifier( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
max_memory_in_mb = 10240,
cache_node_ids = TRUE)
model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(
Run Code Online (Sandbox Code Playgroud)
错误:org.apache.spark.SparkException:作业因阶段失败而中止:阶段69.0中的任务0失败4次,最近失败:阶段69.0中丢失任务0.3(TID 1580,1.1.1.1.1,执行者5): java.lang.IllegalArgumentException:大小超过Integer.MAX_VALUE
我认为这是由于令牌的矩阵表示的稀疏性,但有什么可以在这里完成的吗?这是一个sparklyr问题吗?有spark问题吗?我的代码效率不高吗?
谢谢!
您收到此错误是因为您实际上达到了Spark中我们所拥有的著名2G限制https://issues.apache.org/jira/browse/SPARK-6235
解决方案是在将数据提供给算法之前对数据进行重新分区。
这实际上是这篇文章中的两个陷阱:
因此,让我们回顾一下看起来无害的代码;
library(janeaustenr) # to get some text data
library(stringr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
Run Code Online (Sandbox Code Playgroud)
那么最后一行是做什么的呢?
copy_to (不适用于大数据集),实际上只是将本地R数据帧复制到1个分区Spark DataFrame
因此,您只需要重新分区数据,以确保一旦管道在将数据输入之前准备好数据gbt,分区大小就会小于2GB。
因此,您只需执行以下操作即可对数据进行分区:
# 20 is an arbitrary number I chose to test and it seems to work well in this case,
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <-
copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>%
sdf_repartition(partitions = 20)
Run Code Online (Sandbox Code Playgroud)
PS1: max_memory_in_mb是您要用于gbt计算其统计信息的内存量。它与输入的数据量没有直接关系。
PS2:如果没有为执行程序设置足够的内存,则可能会遇到java.lang.OutOfMemoryError : GC overhead limit exceeded
编辑:重新分区数据是什么意思?
在谈论重新分区之前,我们总是可以参考分区的定义。我会尽量简短。
分区是大型分布式数据集的逻辑块。
Spark使用分区管理数据,该分区有助于以最少的网络流量并行化分布式数据处理,以便在执行程序之间发送数据。默认情况下,Spark尝试将数据从RDD附近的节点读取到RDD中。由于Spark通常访问分布式分区数据,因此为了优化转换操作,它会创建分区来保存数据块。
分区数量的增加将使每个分区的数据更少(或根本没有!)
来源:摘自@JacekLaskowski 精通Apache Spark书。
但是,在这种情况下,数据分区并不总是正确的。因此需要重新分区。(sdf_repartition对sparklyr)
sdf_repartition会分散和整理节点上的数据。也就是说,sdf_repartition(20)将创建20个数据分区,而不是原来的1个分区。
我希望这有帮助。
整个代码:
library(janeaustenr) # to get some text data
library(stringr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
468 次 |
| 最近记录: |