RDD map
和mapPartitions
方法有什么区别?并且flatMap
表现得像map
或喜欢mapPartitions
?谢谢.
(编辑)即,两者之间的差异(在语义上或在执行方面)
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}
Run Code Online (Sandbox Code Playgroud)
和:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
Run Code Online (Sandbox Code Playgroud) PySpark文档描述了两个函数:
Run Code Online (Sandbox Code Playgroud)mapPartitions(f, preservesPartitioning=False) Return a new RDD by applying a function to each partition of this RDD. >>> rdd = sc.parallelize([1, 2, 3, 4], 2) >>> def f(iterator): yield sum(iterator) >>> rdd.mapPartitions(f).collect() [3, 7]
而......
Run Code Online (Sandbox Code Playgroud)mapPartitionsWithIndex(f, preservesPartitioning=False) Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. >>> rdd = sc.parallelize([1, 2, 3, 4], 4) >>> def f(splitIndex, iterator): yield splitIndex >>> rdd.mapPartitionsWithIndex(f).sum() 6
这些功能试图解决哪些用例?我不明白他们为什么会被要求.
我想将1GB(1000万条记录)CSV文件加载到Hbase中.我为它写了Map-Reduce程序.我的代码工作正常但需要1小时才能完成.最后一个减速机需要超过半小时的时间.有人可以帮帮我吗?
我的守则如下:
Driver.Java
package com.cloudera.examples.hbase.bulkimport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * HBase bulk import example
* Data preparation MapReduce job driver **
*/ public class Driver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /* * NBA Final 2010 game …- args[0]: HDFS input path *
- args[1]: HDFS output path *
- args[2]: HBase table name *
我试图从查找表(3 行和 3 列)中获取行并逐行迭代并将每行中的值作为参数传递给 SPARK SQL。
DB | TBL | COL
----------------
db | txn | ID
db | sales | ID
db | fee | ID
Run Code Online (Sandbox Code Playgroud)
我在 spark shell 中尝试了一行,它奏效了。但我发现很难遍历行。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val db_name:String = "db"
val tbl_name:String = "transaction"
val unique_col:String = "transaction_number"
val dupDf = sqlContext.sql(s"select count(*), transaction_number from $db_name.$tbl_name group by $unique_col having count(*)>1")
Run Code Online (Sandbox Code Playgroud)
请让我知道如何遍历行并作为参数传递?