相关疑难解决方法(0)

Apache Spark:map vs mapPartitions?

RDD mapmapPartitions方法有什么区别?并且flatMap表现得像map或喜欢mapPartitions?谢谢.

(编辑)即,两者之间的差异(在语义上或在执行方面)

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }
Run Code Online (Sandbox Code Playgroud)

和:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
Run Code Online (Sandbox Code Playgroud)

performance scala apache-spark rdd

119
推荐指数
3
解决办法
10万
查看次数

何时使用mapParitions和mapPartitionsWithIndex?

PySpark文档描述了两个函数:

mapPartitions(f, preservesPartitioning=False)

   Return a new RDD by applying a function to each partition of this RDD.

   >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
   >>> def f(iterator): yield sum(iterator)
   >>> rdd.mapPartitions(f).collect()
   [3, 7]
Run Code Online (Sandbox Code Playgroud)

而......

mapPartitionsWithIndex(f, preservesPartitioning=False)

   Return a new RDD by applying a function to each partition of this RDD, 
   while tracking the index of the original partition.

   >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
   >>> def f(splitIndex, iterator): yield splitIndex
   >>> rdd.mapPartitionsWithIndex(f).sum()
   6
Run Code Online (Sandbox Code Playgroud)

这些功能试图解决哪些用例?我不明白他们为什么会被要求.

apache-spark pyspark

9
推荐指数
1
解决办法
2万
查看次数

将1GB数据加载到hbase中需要1小时

我想将1GB(1000万条记录)CSV文件加载到Hbase中.我为它写了Map-Reduce程序.我的代码工作正常但需要1小时才能完成.最后一个减速机需要超过半小时的时间.有人可以帮帮我吗?

我的守则如下:

Driver.Java


    package com.cloudera.examples.hbase.bulkimport;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    /**
     * HBase bulk import example
* Data preparation MapReduce job driver *
    *
  1. args[0]: HDFS input path *
  2. args[1]: HDFS output path *
  3. args[2]: HBase table name *
*/ public class Driver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /* * NBA Final 2010 game …

java hadoop hbase mapreduce hadoop2

5
推荐指数
2
解决办法
2184
查看次数

迭代数据帧中的每一行,将其存储在 val 中并作为参数传递给 Spark SQL 查询

我试图从查找表(3 行和 3 列)中获取行并逐行迭代并将每行中的值作为参数传递给 SPARK SQL。

DB | TBL   | COL
----------------
db | txn   | ID

db | sales | ID

db | fee   | ID
Run Code Online (Sandbox Code Playgroud)

我在 spark shell 中尝试了一行,它奏效了。但我发现很难遍历行。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val db_name:String = "db"

val tbl_name:String = "transaction"

val unique_col:String = "transaction_number"

val dupDf = sqlContext.sql(s"select count(*), transaction_number from $db_name.$tbl_name group by $unique_col having count(*)>1") 
Run Code Online (Sandbox Code Playgroud)

请让我知道如何遍历行并作为参数传递?

scala dataframe apache-spark apache-spark-sql

3
推荐指数
1
解决办法
3236
查看次数