HBase Map-only行删除

ean*_*les 4 hadoop hbase mapreduce

第一次编写HBase mapreduce,我在删除HBase中的行时遇到问题(尝试将其作为仅限地图的作业运行).作业成功并且能够扫描HBase表,并且我能够从HBase中读取映射器中的正确rowkeys(通过sysout验证).然而,似乎呼吁Delete del = new Delete(row.get())实际上并没有做任何事情.

下面是我正在尝试运行的代码:

HBaseDelete.java

public class HBaseDelete { 
  public static void main(String[] args) throws Exception {

    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "log_table");
    job.setJarByClass(HBaseDeleteMapper.class);     

    Scan scan = new Scan();
    scan.setCaching(500);        
    scan.setCacheBlocks(false);

    TableMapReduceUtil.initTableMapperJob("log_table", scan, HBaseDeleteMapper.class, null, null, job);

    job.setOutputFormatClass(NullOutputFormat.class);
    job.setNumReduceTasks(0);

    boolean b = job.waitForCompletion(true);
    if (!b) {
        throw new IOException("error with job!");
    }

  }
}
Run Code Online (Sandbox Code Playgroud)

HBaseDeleteMapper.java

public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, Delete>{
  @Override
  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    Delete delete = new Delete(row.get());
    context.write(row, delete);
  }
}
Run Code Online (Sandbox Code Playgroud)

是否缺少"提交"删除的内容?

Rub*_*eda 6

您正在写入上下文,而不是表格,您的映射器看起来应该与此类似:

public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, NullWritable>{

    private HTable myTable;

    protected void setup(Context context) throws IOException, InterruptedException {
        /* HTable instance for deletes */
        myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
    }

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
        myTable.delete(new Delete(row.get())); /* Delete the row from the table */
        //context.write(row, NullWritable.get()); /* Just an output with deleted rows if you need it for something (avoid it if you not) */
    }

    protected void cleanup(Context context) throws IOException, InterruptedException { 
        myTable.close(); /* Close table */
    }

}
Run Code Online (Sandbox Code Playgroud)

请注意,删除操作不使用写缓冲区,此代码将每次删除发出1次RPC操作,这对此类作业不利.要解决这个问题,您可以构建自己List<Delete>的批处理:

public class HBaseDeleteMapper extends TableMapper<NullWritable, NullWritable>{

    private HTable myTable;
    private List<Delete> deleteList = new ArrayList<Delete>();
    final private int buffer = 10000; /* Buffer size, tune it as desired */

    protected void setup(Context context) throws IOException, InterruptedException {
        /* HTable instance for deletes */
        myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
    }

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
        deleteList.add(new Delete(row.get())); /* Add delete to the batch */
        if (deleteList.size()==buffer) {
            myTable.delete(deleteList); /* Submit batch */
            deleteList.clear(); /* Clear batch */
        }
    }

    protected void cleanup(Context context) throws IOException, InterruptedException {
        if (deleteList.size()>0) {
            myTable.delete(deleteList); /* Submit remaining batch */
        }
        myTable.close(); /* Close table */
    }

}
Run Code Online (Sandbox Code Playgroud)