我刚刚编写了一个简单的 hadoop 程序,我试图在其中使用 AES 算法加密文本文件。我在我的 map 方法中逐行读取,加密并写入上下文。很简单。我在我的 map 方法中进行加密并使用行偏移量作为密钥,所以我不需要 reducer 类。
这是我的代码:
public class Enc {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String strDataToEncrypt = new String();
String strCipherText = new String();
KeyGenerator keyGen = KeyGenerator.getInstance("AES");
keyGen.init(128);
SecretKey secretKey = keyGen.generateKey();
Cipher aesCipher = Cipher.getInstance("AES");
aesCipher.init(Cipher.ENCRYPT_MODE,secretKey);
strDataToEncrypt = value.toString();
byte[] byteDataToEncrypt = strDataToEncrypt.getBytes();
byte[] byteCipherText = aesCipher.doFinal(byteDataToEncrypt);
strCipherText = new BASE64Encoder().encode(byteCipherText);
System.out.println("cipher text: " +strCipherText);
String cipherString = new String(strCipherText);
context.write(key, new Text(cipherString));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Enc");
job.setJarByClass(Enc.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
The method write(Text, IntWritable) in the type TaskInputOutputContext<LongWritable,Text,Text,IntWritable> is not applicable for the arguments (LongWritable, Text)
Run Code Online (Sandbox Code Playgroud)
我在这里缺少什么?
编辑_1:
我的最终工作代码在这里:
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.Cipher;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.security.InvalidKeyException;
import java.security.InvalidAlgorithmParameterException;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
import sun.misc.BASE64Encoder;
public class Enc {
public static class Map extends Mapper<LongWritable, Text, LongWritable, Text> {
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String strDataToEncrypt = new String();
String strCipherText = new String();
KeyGenerator keyGen = KeyGenerator.getInstance("AES");
keyGen.init(128);
SecretKey secretKey = keyGen.generateKey();
Cipher aesCipher = Cipher.getInstance("AES");
aesCipher.init(Cipher.ENCRYPT_MODE,secretKey);
strDataToEncrypt = value.toString();
byte[] byteDataToEncrypt = strDataToEncrypt.getBytes();
byte[] byteCipherText = aesCipher.doFinal(byteDataToEncrypt);
strCipherText = new BASE64Encoder().encode(byteCipherText);
System.out.println("cipher text: " +strCipherText);
String cipherString = new String(strCipherText);
context.write(key, new Text(cipherString));
}
catch (NoSuchAlgorithmException noSuchAlgo)
{
System.out.println(" No Such Algorithm exists " + noSuchAlgo);
}
catch (NoSuchPaddingException noSuchPad)
{
System.out.println(" No Such Padding exists " + noSuchPad);
}
catch (InvalidKeyException invalidKey)
{
System.out.println(" Invalid Key " + invalidKey);
}
catch (BadPaddingException badPadding)
{
System.out.println(" Bad Padding " + badPadding);
}
catch (IllegalBlockSizeException illegalBlockSize)
{
System.out.println(" Illegal Block Size " + illegalBlockSize);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Enc");
job.setJarByClass(Enc.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
//job.setCombinerClass(Reduce.class);
//job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Run Code Online (Sandbox Code Playgroud)
这里的第一个重要部分是
public class Map extends Mapper<LongWritable, Text, Text, IntWritable>
Run Code Online (Sandbox Code Playgroud)
其中指出您的 Map 类将 LongWritable 键和 Text 值作为输入,并给出一个 Text 键和一个 IntWritable 值作为输出。
第二个重要的一点是
context.write(key, new Text(cipherString));
Run Code Online (Sandbox Code Playgroud)
这是您从映射器提供输出的地方。Key 是 LongWritable 类型,第二个参数是 Text。
那么这里的问题是存在不匹配。您声称您的映射器在扩展 Mapper 时输出 Text 键和 IntWritable 值,但您实际输出的是 LongWritable 和 Text。如果您实际上打算输出 LongWritable 和文本,则应将类声明更改为
public class Map extends Mapper<LongWritable, Text, LongWritable, Text>
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4471 次 |
| 最近记录: |