我有一个纯文本文件,可能有数百万行需要自定义解析,我想尽快加载到HBase表中(使用Hadoop或HBase Java客户端).
我目前的解决方案是基于没有Reduce部分的MapReduce作业.我FileInputFormat用来读取文本文件,以便每行传递给map我的Mapper类的方法.此时,该行被解析以形成一个Put写入的对象context.然后,TableOutputFormat获取Put对象并将其插入表中.
该解决方案产生的平均插入速率为每秒1,000行,低于我的预期.我的HBase设置在单个服务器上处于伪分布式模式.
一个有趣的事情是,在插入1,000,000行时,会产生25个Mappers(任务),但它们会连续运行(一个接一个); 这是正常的吗?
这是我当前解决方案的代码:
public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
protected void map(LongWritable key, Text value, Context context) throws IOException {
Map<String, String> parsedLine = parseLine(value.toString());
Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
for (String currentKey : parsedLine.keySet()) {
row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
}
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} …Run Code Online (Sandbox Code Playgroud) 想要确认以下内容.请验证这是否正确:1.根据我的理解,当我们将文件复制到HDFS时,文件(假设其大小> 64MB = HDFS块大小)被分成多个块并且每个块存储在不同的块上数据节点.
将文件复制到HDFS时文件内容已经拆分为块,并且在运行map作业时不会发生文件拆分.映射任务仅按照它们在每个最大块上工作的方式进行调度.大小为64 MB,具有数据位置(即映射任务在包含数据/块的节点上运行)
如果文件被压缩(gzip),也会发生文件分割,但MR确保每个文件只由一个映射器处理,即MR将收集位于其他数据节点的所有gzip文件块并将它们全部提供给单个映射器.
如果我们定义isSplitable()以返回false,则会发生与上述相同的事情,即文件的所有块将由在一台机器上运行的一个映射器处理.MR将从不同的数据节点读取文件的所有块,并将它们提供给单个映射器.
纱线的基础设施层与原始地图缩减架构的区别在于以下方式:
在YARN中,作业跟踪器被分为两个不同的守护进程,称为Resource Manager和Node Manager(特定于节点).除了包括仅处理调度作业的调度程序而不担心任何监视或状态更新之外,资源管理器仅管理对不同作业的资源分配.内存,CPU时间,网络带宽等不同资源被放入一个称为的单元Resource Container.AppMasters在不同节点上有不同的运行,它们与许多这些资源容器通信,因此使用监视/状态详细信息更新节点管理器.
我想知道使用这种方法如何从map-reduce角度提高性能?此外,如果纱线背后的动机及其对Map-reduce的现有实施的好处有任何明确的内容,请指出我的相同内容.
一,背景.我曾经有一个集合,logs并使用map/reduce来生成各种报告.这些报告中的大多数是基于一天内的数据,所以我总是有一个条件d: SOME_DATE.当logs集合变得非常大时,插入变得极其缓慢(比我们监视的应用程序生成日志的速度慢),即使在丢弃大量索引之后也是如此.所以我们决定将每一天的数据放在一个单独的集合中 - logs_YYYY-mm-dd这样索引就更小了,我们甚至不需要索引日期.这很酷,因为大多数报告(因此map/reduce)都是每日数据.但是,我们有一份报告,我们需要覆盖多天.
而现在的问题.有没有办法在多个集合上运行map/reduce(或更确切地说,地图),就好像它只是一个?
我正在使用带有mongoid的rails 3.我有一个股票的集合与嵌入的价格集合:
class Stock
include Mongoid::Document
field :name, :type => String
field :code, :type => Integer
embeds_many :prices
class Price
include Mongoid::Document
field :date, :type => DateTime
field :value, :type => Float
embedded_in :stock, :inverse_of => :prices
Run Code Online (Sandbox Code Playgroud)
我想得到自给定日期以来最低价格低于给定价格p的股票,然后能够对每种股票的价格进行排序.
但看起来Mongodb不允许这样做.因为这不起作用:
@stocks = Stock.Where(:prices.value.lt => p)
Run Code Online (Sandbox Code Playgroud)
此外,似乎mongoDB无法对嵌入对象进行排序.
那么,有没有替代方案来完成这项任务?
也许我应该将所有内容放在一个集合中,以便我可以轻松运行以下查询:
@stocks = Stock.Where(:prices.lt => p)
Run Code Online (Sandbox Code Playgroud)
但我真的希望在我的查询后以股票名称对结果进行分组(例如,具有一系列有序价格的不同股票).我听说过map/reduce with group function但我不确定如何正确使用Mongoid.
http://www.mongodb.org/display/DOCS/Aggregation
SQL中的等价物将是这样的:
SELECT name, code, min(price) from Stock WHERE price<p GROUP BY name, code
Run Code Online (Sandbox Code Playgroud)
谢谢你的帮助.
不推荐使用org.apache.hadoop.mapreduce.Job的所有三个构造函数,有没有办法以不推荐的方式构造Job类?
谢谢.
我正在使用新的Hadoop API并寻找一种方法将一些参数(少量字符串)传递给映射器.
我怎样才能做到这一点?
JobConf job = (JobConf)getConf();
job.set("NumberOfDocuments", args[0]);
Run Code Online (Sandbox Code Playgroud)
这里," NumberOfDocuments"是参数的名称,其值是args[0]从命令行参数" " 读取的.设置此参数后,可以在reducer或mapper中检索其值,如下所示:
private static Long N;
public void configure(JobConf job) {
N = Long.parseLong(job.get("NumberOfDocuments"));
}
Run Code Online (Sandbox Code Playgroud)
注意,棘手的部分是你不能设置这样的参数:
Configuration con = new Configuration();
con.set("NumberOfDocuments", args[0]);
Run Code Online (Sandbox Code Playgroud) 我认为他们指的是减速机,但在我的程序中我有
public static class MyMapper extends
Mapper< LongWritable, Text, Text, Text >
和
public static class MyReducer extends
Reducer< Text, Text, NullWritable, Text >
所以,如果我有
job.setOutputKeyClass( NullWritable.class );
job.setOutputValueClass( Text.class );
我得到以下例外
Type mismatch in key from map: expected org.apache.hadoop.io.NullWritable, recieved org.apache.hadoop.io.Text
但如果我有
job.setOutputKeyClass( Text.class );
没有问题.
我的代码是否有错误,或者这是因为NullWritable还是其他?
我也必须使用job.setInputFormatClass和job.setOutputFormatClass?因为没有它们我的程序运行正常.
我正在尝试调整现有问题以满足我的需求..
基本上输入是简单的文本我处理它并将键/值对传递给reducer我创建一个json ..所以有键但没有值所以mapper:
输入:文字/文字
输出:文本/文本
减速器:文本/文本
输出:文本/无
我的签名如下:
public class AdvanceCounter {
/**
* The map class of WordCount.
*/
public static class TokenCounterMapper
extends Mapper<Object, Text, Text, Text> { // <--- See this signature
public void map(Object key, Text value, Context context) // <--- See this signature
throws IOException, InterruptedException {
context.write(key,value); //both are of type text OUTPUT TO REDUCER
}
}
public static class TokenCounterReducer
extends Reducer<Text, Text, Text, **NullWritable**> { // <--- See this signature Nullwritable here …Run Code Online (Sandbox Code Playgroud) 我在我的机器上安装了Cloudera VM 5.8版.当我执行字数统计mapreduce工作时,它会抛出异常.
`16/09/06 06:55:49 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:862)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:600)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:789)
`
Run Code Online (Sandbox Code Playgroud)
但工作顺利完成.请有人帮我解决这个问题.
谢谢.
hadoop mapreduce cloudera hortonworks-data-platform hortonworks-sandbox
mapreduce ×10
hadoop ×8
java ×3
mongodb ×2
cloudera ×1
deprecated ×1
hadoop-yarn ×1
hbase ×1
mongoid ×1