从Apache Storm bolt插入和删除HBase中的值的方法

miu*_*ser 7 hbase apache-storm

我在Hadoop上运行的Storm拓扑配置为伪分布式模式.拓扑包含一个必须将数据写入Hbase的螺栓.我的第一个用于测试目的的execute方法是在我的bolt 方法中创建(和关闭)连接和写入数据.但是看起来我的本地机器上没有那么多资源来处理所有进入HBase的请求.在大约30个成功处理请求后,我在Storm工作日志中看到以下内容:

o.a.z.ClientCnxn [INFO] Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
o.a.z.ClientCnxn [INFO] Socket connection established to localhost/127.0.0.1:2181, initiating session
o.a.z.ClientCnxn [INFO] Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect
o.a.h.h.z.RecoverableZooKeeper [WARN] Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
Run Code Online (Sandbox Code Playgroud)

我的想法是通过为每个螺栓实例创建单个连接来减少HBase的连接数 - 在prepare方法中打开连接并关闭它cleanup.但是根据文档cleanup不保证在分布式模式下调用.

在此之后,我发现了Storm的Hbase框架 - storm-hbase.不幸的是,几乎没有关于它的信息,只是README在它的github回购.

  1. 所以我的第一个问题是使用storm-hbase进行Storm-Hbase集成是一个很好的解决方案吗?什么是最好的方法呢?

此外,我需要能够从HBase表中删除单元格.但我在storm-hbase doc中没有找到任何关于它的内容.

  1. 有没有可能用storm-hbase做到这一点?或者回到上一个问题,还有另一种方法可以做到这一切吗?

提前致谢!

SQL*_*ion 1

例如,您可以使用“发布者”线程吗?

这是:有一个单独的类作为线程运行,它将为您执行对 hbase/mysql/elasticsearch/hdfs/etc 的请求。并且出于性能原因应该分批进行。

  1. 有一个处理并发操作的全局列表和一个执行器服务:

    private transient BlockingQueue<Tuple> insertQueue;
    private transient ExecutorService theExecutor;
    private transient Future<?> publisherFuture;
    
    Run Code Online (Sandbox Code Playgroud)
  2. 有一个线程类将为您插入文档

    private class Publisher implements Runnable {
    
    @Override
    public void run() {
               long sendBatchTs = System.currentTimeMillis();
    
              while (true){
    
                  if(insertQueue.size >100){ // 100 tuples per batch
                         List<Tuple> batchQueue = new ArrayList<>(100);
                         insertQueue.drainTo(batchQueue, 100);
                         // write code to insert the 100 documents
                        sendBatchTs = System.currentTimeMillis();
                  }
                  else if (System.currentTimeMillis() - sendBatchTs > 5000){
                  // to prevent tuple timeout
                         int listSize = batchQueue.size();
    
                          List<Tuple> batchQueue = new ArrayList<>(listSize);
                         insertQueue.drainTo(batchQueue, listSize);
                         // write code to insert the 100 documents
                        sendBatchTs = System.currentTimeMillis();
                  }
    
    
              }
    
    
     // your code
    }
    }
    
    Run Code Online (Sandbox Code Playgroud)
  3. 在prepare方法中初始化线程类和列表

      @Override
      public void prepare (final Map _conf, final TopologyContext _context , final OutputCollector _collector) {
    
    // open your connection
    
       insertQueue = new LinkedBlockingQueue<>();
       theExecutor = Executors.newSingleThreadExecutor();
       publisherFuture = theExecutor.submit(new Publisher());
    }
    
    Run Code Online (Sandbox Code Playgroud)
  4. 清理时关闭连接

    @Override
    public void cleanup() {
       super.cleanup();
    
       theExecutor.shutdown();
       publisherFuture.cancel(true);
       // close your connection
     }
    
    Run Code Online (Sandbox Code Playgroud)
  5. 在执行方法中收集元组

      @Override
      public void execute(final Tuple _tuple) {
               insertQueue.add(_tuple);
    
      }
    
    Run Code Online (Sandbox Code Playgroud)