use*_*834 5 java hadoop mapreduce distributed-cache
我正在尝试实现reduce side join,并使用mapfile reader查找分布式缓存,但是在stderr中检查时它没有查找值,它显示以下错误,lookupfile文件已经存在于hdfs中,并且似乎正确加载进入缓存,如stdout中所示.
java.lang.IllegalArgumentException:Wrong FS:file:/ app/hadoop/tmp/mapred/local/taskTracker/distcache/-8118663285704962921_-1196516983_170706299/localhost/input/delivery_status/DeliveryStatusCodes/data,expected:hdfs:// localhost:9000 org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:390)位于org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus的org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140) (DistributedFileSystem.java:554)atg.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:816)org.apache.hadoop.io.SequenceFile $ Reader.(SequenceFile.java:1479)org.apache .hadoop.io.SequenceFile $ Reader.(SequenceFile.java:1474)org.apache.hadoop.io.MapFile $ Reader.createDataFileReader(MapFile.java:302)at org.apache.hadoop.io.MapFile $ Reader.打开(MapFile.java:284)org.apache.hadoop.io.MapFile $ Reader.(MapFile.java:273)org.apache.hadoop.io.MapFile $ Reader.(MapFile.java:260)at org .apache.hadoop.io.MapFile $读卡器(MapFile.java:253)在mr_poc.reducerrsj.initializeDepartmentsMap(reducerrsj.java:59)mr_poc.reducerrsj.setup(reducerrsj.java:42)atg.apache.hadoop.mapreduce.Reducer.run(Reducer.java:174)at org.位于org.apache.hadoop.mapred.Child $ 4.run的org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)中的apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) .java:255)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:416)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java) :1190)org.apache.hadoop.mapred.Child.main(Child.java:249)mr_poc.reducerrsj.reduce(reducerrsj.java)mr_poc.reducerrsj.buildOutputValue(reducerrsj.java:83)中的java.lang.NullPointerException :127)位于org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.)的org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)中的mr_poc.reducerrsj.reduce(reducerrsj.java:1). java:649)at org.apache.hadoop.mapred.ReduceTask.run(Reduc)eTask.java:418)atg.apache.hadoop.mapred.Child $ 4.run(Child.java:255)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject .java:416)在org.apache.hadoop.security.
这是我的驱动程序代码,
package mr_poc;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class driverrsj extends Configured implements Tool{
@Override
public int run(String[] arg) throws Exception {
if(arg.length != 3)
{
System.out.printf("3 parameters are required for DriverRSJ- <Input Dir1> <Input Dir2> <Output Dir> \n");
return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
DistributedCache.addCacheFile(new URI("/input/delivery_status"), conf);
System.out.println("Cache : " + job.getConfiguration().get("mapred.cache.files"));
job.setJarByClass(driverrsj.class);
conf.setInt("cust_info", 1);
conf.setInt("status", 2);
StringBuilder inputPaths = new StringBuilder();
inputPaths.append(arg[0].toString()).append(",").append(arg[1].toString());
FileInputFormat.setInputPaths(job, inputPaths.toString());
FileOutputFormat.setOutputPath(job, new Path(arg[2]));
job.setJarByClass(driverrsj.class);
job.setMapperClass(mappperRSJ.class);
job.setReducerClass(reducerrsj.class);
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class);
job.setMapOutputValueClass(Text.class);
//job.setPartitionerClass(partinonrsj.class);
job.setSortComparatorClass(secondarysortcomp.class);
job.setGroupingComparatorClass(GroupingComparatorRSJ.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
boolean success =job.waitForCompletion(true);
return success? 0 : 1;
}
public static void main(String[] args) throws Exception{
int exitCode = ToolRunner.run(new Configuration(), new driverrsj(),args);
System.exit(exitCode);
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的reducer代码
package mr_poc;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class reducerrsj extends Reducer<CompositeKeyWritableRSJ, Text, NullWritable, Text>{
StringBuilder reduceValueBuilder = new StringBuilder("");
NullWritable nullWritableKey = NullWritable.get();
Text reduceOutputValue = new Text("");
String strSeparator = ",";
private MapFile.Reader deptMapReader = null;
Text txtMapFileLookupKey = new Text();
Text txtMapFileLookupValue = new Text();
//Path[] cacheFilesLocal;
//Path[] eachPath;
@Override
protected void setup(Context context) throws IOException,InterruptedException {
Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
for ( Path eachPath : cacheFiles){
System.out.println(eachPath.toString());
System.out.println(eachPath.getName());
if(eachPath.getName().toString().contains("delivery_status"))
{
URI uriUncompressedFile = new File(eachPath.toString()+ "/DeliveryStatusCodes").toURI();
initializeDepartmentsMap(uriUncompressedFile, context);
}
}
}
//@SuppressWarnings("deprecation")
private void initializeDepartmentsMap(URI uriUncompressedFile, Context context)
throws IOException {
// {{
// Initialize the reader of the map file (side data)
Configuration conf = context.getConfiguration();
conf.addResource(new Path("/usr/local/hadoop-1.2.1/conf/core-site.xml"));
FileSystem dfs = FileSystem.get(conf);
try {
deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration());
} catch (Exception e) {
e.printStackTrace();
}
// }}
}
private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key,
StringBuilder reduceValueBuilder, Text value) {
if (key.getsourceindex() == 2) {
String arrSalAttributes[] = value.toString().split(",");
txtMapFileLookupKey.set(arrSalAttributes[0].toString());
System.out.println("key=" + txtMapFileLookupKey);
try {
deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue);
}
catch (Exception e) {
txtMapFileLookupValue.set("");
e.printStackTrace();
} finally {
txtMapFileLookupValue
.set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue
.equals("")) ? "NOT-FOUND"
: txtMapFileLookupValue.toString());
}
reduceValueBuilder.append(txtMapFileLookupValue.toString());
} else if(key.getsourceindex() == 1) {
String arrEmpAttributes[] = value.toString().split(",");
reduceValueBuilder.append(arrEmpAttributes[0].toString()).append(
strSeparator);
}
txtMapFileLookupKey.set("");
txtMapFileLookupValue.set("");
return reduceValueBuilder;
}
@Override
public void reduce(CompositeKeyWritableRSJ key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
for (Text value : values) {
buildOutputValue(key, reduceValueBuilder, value);
}
// Drop last comma, set value, and emit output
if (reduceValueBuilder.length() > 1) {
//reduceValueBuilder.setLength(reduceValueBuilder.length() - 1);
// Emit output
reduceOutputValue.set(reduceValueBuilder.toString());
context.write(nullWritableKey, reduceOutputValue);
} else {
System.out.println("Key=" + key.getjoinkey() + "src="
+ key.getsourceindex());
}
// Reset variables
reduceValueBuilder.setLength(0);
reduceOutputValue.set("");
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
if(deptMapReader != null)
{
deptMapReader.close();
}
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的核心站点Xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/app/hadoop/tmp</value>
<description>A base for other temporary directories.</description>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
<description>The name of the default file system. A URI whose
scheme and authority determine the FileSystem implementation. The
uri's scheme determines the config property (fs.SCHEME.impl) naming
the FileSystem implementation class. The uri's authority is used to
determine the host, port, etc. for a filesystem.</description>
</property>
</configuration>
Run Code Online (Sandbox Code Playgroud)
任何帮助将受到高度赞赏.提前致谢!!!
小智 8
我有同样的问题,我通过添加解决了这个问题
FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"),conf)
Run Code Online (Sandbox Code Playgroud)
在驱动程序类中.
你必须导入URI从java.net.URI
小智 4
conf您应该根据您的文件设置属性,core-site.xml如下所示:
conf.set("fs.defaultFS", "hdfs://host:port");
conf.set("mapreduce.jobtracker.address", "host:port");
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
13862 次 |
| 最近记录: |