DNA*_*DNA 184
以下是作为spark-shell会话的差异示例:
首先,一些数据 - 两行文字:
val rdd = sc.parallelize(Seq("Roses are red", "Violets are blue")) // lines
rdd.collect
res0: Array[String] = Array("Roses are red", "Violets are blue")
Run Code Online (Sandbox Code Playgroud)
现在,map将长度为N的RDD转换为另一个长度为N的RDD.
例如,它将两行映射为两个行长度:
rdd.map(_.length).collect
res1: Array[Int] = Array(13, 16)
Run Code Online (Sandbox Code Playgroud)
但是flatMap(松散地说)将长度为N的RDD转换为N个集合的集合,然后将这些RDD展平为单个RDD结果.
rdd.flatMap(_.split(" ")).collect
res2: Array[String] = Array("Roses", "are", "red", "Violets", "are", "blue")
Run Code Online (Sandbox Code Playgroud)
我们每行有多个单词和多行,但最后我们得到一个单词输出数组
为了说明这一点,flatMapping从一组行到一组单词看起来像:
["aa bb cc", "", "dd"] => [["aa","bb","cc"],[],["dd"]] => ["aa","bb","cc","dd"]
Run Code Online (Sandbox Code Playgroud)
因此,输入和输出RDD通常具有不同的尺寸flatMap.
如果我们试图使用map我们的split函数,我们最终会得到嵌套结构(一个单词数组的RDD,带有类型RDD[Array[String]]),因为每个输入必须有一个结果:
rdd.map(_.split(" ")).collect
res3: Array[Array[String]] = Array(
Array(Roses, are, red),
Array(Violets, are, blue)
)
Run Code Online (Sandbox Code Playgroud)
最后,一个有用的特殊情况是使用可能不返回答案的函数进行映射,因此返回一个Option.我们可以flatMap用来过滤掉返回的元素,None并从返回的元素中提取值Some:
val rdd = sc.parallelize(Seq(1,2,3,4))
def myfn(x: Int): Option[Int] = if (x <= 2) Some(x * 10) else None
rdd.flatMap(myfn).collect
res3: Array[Int] = Array(10,20)
Run Code Online (Sandbox Code Playgroud)
(这里注意到Option的行为类似于具有一个元素或零元素的列表)
yog*_*oga 88
通常我们在hadoop中使用字数统计示例.我将采取相同的使用情况,并会使用map和flatMap,我们将看到的区别是如何处理数据.
下面是示例数据文件.
hadoop is fast
hive is sql on hdfs
spark is superfast
spark is awesome
Run Code Online (Sandbox Code Playgroud)
上面的文件将使用map和解析flatMap.
map>>> wc = data.map(lambda line:line.split(" "));
>>> wc.collect()
[u'hadoop is fast', u'hive is sql on hdfs', u'spark is superfast', u'spark is awesome']
Run Code Online (Sandbox Code Playgroud)
输入有4行,输出大小也是4,即N个元素==> N个元素.
flatMap>>> fm = data.flatMap(lambda line:line.split(" "));
>>> fm.collect()
[u'hadoop', u'is', u'fast', u'hive', u'is', u'sql', u'on', u'hdfs', u'spark', u'is', u'superfast', u'spark', u'is', u'awesome']
Run Code Online (Sandbox Code Playgroud)
输出与地图不同.
让我们为每个键指定1作为值以获得单词计数.
fm:使用创建的RDD flatMapwc:RDD使用创建 map>>> fm.map(lambda word : (word,1)).collect()
[(u'hadoop', 1), (u'is', 1), (u'fast', 1), (u'hive', 1), (u'is', 1), (u'sql', 1), (u'on', 1), (u'hdfs', 1), (u'spark', 1), (u'is', 1), (u'superfast', 1), (u'spark', 1), (u'is', 1), (u'awesome', 1)]
Run Code Online (Sandbox Code Playgroud)
而flatMap在RDD上wc会给出以下不需要的输出:
>>> wc.flatMap(lambda word : (word,1)).collect()
[[u'hadoop', u'is', u'fast'], 1, [u'hive', u'is', u'sql', u'on', u'hdfs'], 1, [u'spark', u'is', u'superfast'], 1, [u'spark', u'is', u'awesome'], 1]
Run Code Online (Sandbox Code Playgroud)
如果map使用而不是,则无法获得单词计数flatMap.
根据定义,map和之间的区别flatMap是:
map:它通过将给定函数应用于RDD的每个元素来返回新的RDD.函数map只返回一个项目.
flatMap:类似于map,它通过将函数应用于RDD的每个元素来返回新的RDD,但输出被展平.
小智 17
如果你在Spark中询问RDD.map和RDD.flatMap之间的区别,map会将大小为N的RDD转换为另一个大小为N的RDD.例如.
myRDD.map(x => x*2)
Run Code Online (Sandbox Code Playgroud)
例如,如果myRDD由双打组成.
虽然flatMap可以将RDD转换为另一个不同大小的另一个:例如:
myRDD.flatMap(x =>new Seq(2*x,3*x))
Run Code Online (Sandbox Code Playgroud)
这将返回大小为2*N或的RDD
myRDD.flatMap(x =>if x<10 new Seq(2*x,3*x) else new Seq(x) )
Run Code Online (Sandbox Code Playgroud)
Ram*_*ram 16
所有例子都很好......这是很好的视觉插图......来源:火花的DataFlair培训
Map :映射是 Apache Spark 中的一种转换操作。它适用于 RDD 的每个元素,并将结果作为新的 RDD 返回。在 Map 中,运营开发者可以定义自己的自定义业务逻辑。相同的逻辑将应用于 RDD 的所有元素。
Spark RDDmap函数以一个元素作为输入,根据自定义代码(由开发人员指定)进行处理,一次返回一个元素。Map 将一个长度为 N 的 RDD 转换为另一个长度为 N 的 RDD。输入和输出 RDD 通常具有相同数量的记录。
map使用 scala 的示例:
val x = spark.sparkContext.parallelize(List("spark", "map", "example", "sample", "example"), 3)
val y = x.map(x => (x, 1))
y.collect
// res0: Array[(String, Int)] =
// Array((spark,1), (map,1), (example,1), (sample,1), (example,1))
// rdd y can be re writen with shorter syntax in scala as
val y = x.map((_, 1))
y.collect
// res1: Array[(String, Int)] =
// Array((spark,1), (map,1), (example,1), (sample,1), (example,1))
// Another example of making tuple with string and it's length
val y = x.map(x => (x, x.length))
y.collect
// res3: Array[(String, Int)] =
// Array((spark,5), (map,3), (example,7), (sample,6), (example,7))
Run Code Online (Sandbox Code Playgroud)
平面图:
AflatMap是变换操作。它适用于 RDD 的每个元素,并将结果作为 new 返回RDD。它类似于 Map,但 FlatMap 允许从 map 函数返回 0、1 或多个元素。在 FlatMap 操作中,开发人员可以定义自己的自定义业务逻辑。相同的逻辑将应用于 RDD 的所有元素。
“扁平化结果”是什么意思?
FlatMap 函数将一个元素作为输入处理,根据自定义代码(由开发人员指定)并一次返回 0 个或多个元素。flatMap() 将一个长度为 N 的 RDD 转换为另一个长度为 M 的 RDD。
flatMap使用 scala 的示例:
val x = spark.sparkContext.parallelize(List("spark flatmap example", "sample example"), 2)
// map operation will return Array of Arrays in following case : check type of res0
val y = x.map(x => x.split(" ")) // split(" ") returns an array of words
y.collect
// res0: Array[Array[String]] =
// Array(Array(spark, flatmap, example), Array(sample, example))
// flatMap operation will return Array of words in following case : Check type of res1
val y = x.flatMap(x => x.split(" "))
y.collect
//res1: Array[String] =
// Array(spark, flatmap, example, sample, example)
// RDD y can be re written with shorter syntax in scala as
val y = x.flatMap(_.split(" "))
y.collect
//res2: Array[String] =
// Array(spark, flatmap, example, sample, example)
Run Code Online (Sandbox Code Playgroud)
pan*_*ang 15
使用test.md作为一个例子:
? spark-1.6.1 cat test.md
This is the first line;
This is the second line;
This is the last line.
scala> val textFile = sc.textFile("test.md")
scala> textFile.map(line => line.split(" ")).count()
res2: Long = 3
scala> textFile.flatMap(line => line.split(" ")).count()
res3: Long = 15
scala> textFile.map(line => line.split(" ")).collect()
res0: Array[Array[String]] = Array(Array(This, is, the, first, line;), Array(This, is, the, second, line;), Array(This, is, the, last, line.))
scala> textFile.flatMap(line => line.split(" ")).collect()
res1: Array[String] = Array(This, is, the, first, line;, This, is, the, second, line;, This, is, the, last, line.)
Run Code Online (Sandbox Code Playgroud)
如果您使用的map方法,你会得到的线test.md,对flatMap方法,你会得到的单词数.
该map方法类似于flatMap,它们都返回一个新的RDD.map方法经常使用返回一个新的RDD,flatMap方法经常使用拆分字.
ram*_*amu 14
它归结为你最初的问题:扁平化是什么意思?
当您使用flatMap时,"多维"集合变为"一维"集合.
val array1d = Array ("1,2,3", "4,5,6", "7,8,9")
//array1d is an array of strings
val array2d = array1d.map(x => x.split(","))
//array2d will be : Array( Array(1,2,3), Array(4,5,6), Array(7,8,9) )
val flatArray = array1d.flatMap(x => x.split(","))
//flatArray will be : Array (1,2,3,4,5,6,7,8,9)
Run Code Online (Sandbox Code Playgroud)
你想使用flatMap时,
map返回相同数量元素的RDD,而flatMap不是.
flatMap过滤掉丢失或不正确数据的示例用例.
用于map各种情况的示例用例,其中输入和输出的元素数量相同.
number.csv
1
2
3
-
4
-
5
Run Code Online (Sandbox Code Playgroud)
map.py添加add.csv中的所有数字.
from operator import *
def f(row):
try:
return float(row)
except Exception:
return 0
rdd = sc.textFile('a.csv').map(f)
print(rdd.count()) # 7
print(rdd.reduce(add)) # 15.0
Run Code Online (Sandbox Code Playgroud)
flatMap.py用于flatMap在添加之前过滤掉缺失的数据.与先前版本相比,添加的数字更少.
from operator import *
def f(row):
try:
return [float(row)]
except Exception:
return []
rdd = sc.textFile('a.csv').flatMap(f)
print(rdd.count()) # 5
print(rdd.reduce(add)) # 15.0
Run Code Online (Sandbox Code Playgroud)
小智 8
map和flatMap是相似的,从某种意义上说,它们从输入RDD中取一行并在其上应用一个函数.它们的区别在于map中的函数只返回一个元素,而flatMap中的函数可以返回一个元素列表(0或更多)作为迭代器.
此外,flatMap的输出是扁平的.虽然flatMap中的函数返回一个元素列表,但flatMap返回一个RDD,它以平面方式(不是列表)包含列表中的所有元素.
小智 5
可以从下面的示例pyspark代码中看到区别:
rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()
Output:
[1, 1, 2, 1, 2, 3]
rdd.map(lambda x: range(1, x)).collect()
Output:
[[1], [1, 2], [1, 2, 3]]
Run Code Online (Sandbox Code Playgroud)
map:它RDD通过将函数应用于 的每个元素来返回一个新值RDD。.map 中的函数只能返回一项。
flatMap:与map类似,它RDD通过对RDD的每个元素应用函数来返回一个新值,但输出是扁平化的。
此外,函数 inflatMap可以返回元素列表(0 个或多个)
例如:
sc.parallelize([3,4,5]).map(lambda x: range(1,x)).collect()
Run Code Online (Sandbox Code Playgroud)
输出:[[1, 2], [1, 2, 3], [1, 2, 3, 4]]
sc.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect()
Run Code Online (Sandbox Code Playgroud)
输出:注意 o/p 被展平为单个列表 [1, 2, 1, 2, 3, 1, 2, 3, 4]
| 归档时间: |
|
| 查看次数: |
151799 次 |
| 最近记录: |