我想知道为什么分组比较器用于二级mapreduce.
根据二级分类的权威指南示例
我们希望键的排序顺序是年份(升序),然后是温度(降序):
1900 35°C
1900 34°C
1900 34°C
...
1901 36°C
1901 35°C
Run Code Online (Sandbox Code Playgroud)
通过将分区器设置为按键的年份部分进行分区,我们可以保证同一年的记录转到同一个reducer.然而,这仍然不足以实现我们的目标.分区程序仅确保一个reducer接收一年的所有记录; 它不会改变reducer在分区内按键分组的事实.
既然我们已经编写了自己的分区器来处理特定reducer的map输出键,那么我们为什么要对它进行分组呢.
提前致谢
众所周知,Spark中的分区器对任何"广泛"操作都会产生巨大的性能影响,因此通常会在操作中进行自定义.我正在尝试以下代码:
val rdd1 =
sc.parallelize(1 to 50).keyBy(_ % 10)
.partitionBy(new HashPartitioner(10))
val rdd2 =
sc.parallelize(200 to 230).keyBy(_ % 13)
val cogrouped = rdd1.cogroup(rdd2)
println("cogrouped: " + cogrouped.partitioner)
val unioned = rdd1.union(rdd2)
println("union: " + unioned.partitioner)
Run Code Online (Sandbox Code Playgroud)
我看到默认情况下cogroup()
总是会生成带有自定义分区程序的RDD,但union()
不会,它将始终恢复为默认值.这是违反直觉的,因为我们通常假设PairRDD应该使用其第一个元素作为分区键.有没有办法"强制"Spark合并2个PairRDD以使用相同的分区键?
谁能解释我在hadoop中如何进行二次排序?
为什么必须使用GroupingComparator
它以及它在hadoop中如何工作?
我正在浏览下面给出的链接,并怀疑groupcomapator如何工作.
任何人都可以解释一下分组比较器的工作原理吗?
http://www.bigdataspeak.com/2013/02/hadoop-how-to-do-secondary-sort-on_25.html
我正在研究一个hadoop项目,经过多次访问各种博客和阅读文档,我意识到我需要使用hadoop框架提供的二次排序功能.
我的输入格式是以下形式:
DESC(String) Price(Integer) and some other Text
我希望reducer中的值是Price的降序.同时在比较DESC时,我有一个方法,它取两个字符串和百分比,如果两个字符串之间的相似性等于或大于百分比,那么我应该认为它们是相等的.
问题是在Reduce Job完成之后我可以看到一些DESC与其他字符串相似但它们在不同的组中.
这是我的Composite键的compareTo方法
public int compareTo(VendorKey o) {
int result =-
result = compare(token, o.token, ":") >= percentage ? 0:1;
if (result == 0) {
return pid> o.pid ?-1: pid < o.pid ?1:0;
}
return result;
}
Run Code Online (Sandbox Code Playgroud)
比较分组比较器的方法
public int compare(WritableComparable a, WritableComparable b) {
VendorKey one = (VendorKey) a;
VendorKey two = (VendorKey) b;
int result = ClusterUtil.compare(one.getToken(), two.getToken(), ":") >= one.getPercentage() ? 0 : 1;
// if (result != …
Run Code Online (Sandbox Code Playgroud) Hadoop是否根据程序中设置的映射器数量拆分数据?也就是说,如果映射器的数量为200(假设Hadoop集群同时允许200个映射器),则拥有大小为500MB的数据集,每个映射器是否给出了2.5 MB的数据?
此外,所有的映射器是否同时运行,或者其中一些可能会串行运行?
我正在运行这个命令——
sudo -u hdfs hadoop fs -du -h /user | sort -nr
Run Code Online (Sandbox Code Playgroud)
并且输出未按 gigs、Terabytes、GB 排序
我发现这个命令 -
hdfs dfs -du -s /foo/bar/*tobedeleted | sort -r -k 1 -g | awk '{ suffix="KMGT"; for(i=0; $1>1024 && i < length(suffix); i++) $1/=1024; print int($1) substr(suffix, i, 1), $3; }'
Run Code Online (Sandbox Code Playgroud)
但似乎没有用。
有没有一种方法或命令行标志我可以用来让它排序并且输出应该看起来像--
123T /xyz
124T /xyd
126T /vat
127G /ayf
123G /atd
Run Code Online (Sandbox Code Playgroud)
请帮忙
问候马尤尔
我想在Hive中创建一个表
CREATE TABLE BUCKET_TABLE AS
SELECT a.* FROM TABLE1 a LEFT JOIN TABLE2 b ON (a.key=b.key) WHERE b.key IS NUll
CLUSTERED BY (key) INTO 1000 BUCKETS;
Run Code Online (Sandbox Code Playgroud)
这种语法失败了 - 但我不确定是否有可能做这个组合语句.有任何想法吗?
我正在使用 Spark 将数据写入分区。给定一个包含两列的数据集(foo, bar)
,如果我这样做df.write.mode("overwrite").format("csv").partitionBy("foo").save("/tmp/output")
,我会得到一个输出
/tmp/output/foo=1/X.csv
/tmp/output/foo=2/Y.csv
...
Run Code Online (Sandbox Code Playgroud)
但是,输出 CSV 文件仅包含 的值bar
,而不包含foo
。我知道 的值foo
已在目录名称中捕获foo=N
,但是否也可以foo
在 CSV 文件中包含 的值?
我想帮助理解算法.我首先粘贴了算法解释,然后是我的疑惑.
算法:(用于计算记录对之间的重叠)
给定用户定义的参数K,文件DR(*Format:record_id,data*)被分成K个近似等大小的块,使得文档Di的数据落入第i/K th块中.
我们覆盖了Hadoop的分区功能,该功能将映射器发出的密钥映射到reducer实例.每个键(i,j)都映射到j/Kth组中的reducer.
特殊键i,*及其相关值,即文档的数据最多被复制K次,以便可以在每个缩减器处传送文档的全部内容.因此,组中的每个缩减器仅需要在存储器中恢复并加载一个DR文件块,其大小可以通过改变K而任意设置为小.因此可以计算重叠.这是以复制通过MapReduce框架提供的文档为代价的.
释疑:
我做了一些假设:
声明:每个键(i,j)都映射到j/Kth组中的reducer.假设:存在K个减少节点,并且该密钥被映射到j/Kth reduce节点.
怀疑:是否有一些减少节点组合在一起?比如0,1,2节点被分组为0组?
声明:文档的数据最多可复制K次,因此可以在每个reducer上传送文档的全部内容.
所以这意味着K等于没有.减速机节点?如果没有,我们正在浪费计算节点,而不是正确使用它们?
主要疑问:K是否等于减速器节点的数量?
希望得到回应!
谢谢!
java hadoop mapreduce elastic-map-reduce hadoop-partitioning
我在我的hive表中在pyspark中创建了两个数据帧:
data1 = spark.sql("""
SELECT ID, MODEL_NUMBER, MODEL_YEAR ,COUNTRY_CODE
from MODEL_TABLE1 where COUNTRY_CODE in ('IND','CHN','USA','RUS','AUS')
""");
Run Code Online (Sandbox Code Playgroud)
每个国家/地区都有数百万字母数字格式的唯一ID.
data2 = spark.sql("""
SELECT ID,MODEL_NUMBER, MODEL_YEAR, COUNTRY_CODE
from MODEL_TABLE2 where COUNTRY_CODE in ('IND','CHN')
""");
Run Code Online (Sandbox Code Playgroud)
我想在ID列上使用pyspark加入这两个数据帧.
我们如何重新划分数据,使其在分区中均匀分布.
我可以使用下面的数据来修复我的数据吗?
newdf1 = data2.repartition(100, "ID")
newdf2 = data2.repartition(100, "ID")
Run Code Online (Sandbox Code Playgroud)
什么是分区的最佳方式,以便加快工作?