miu*_*ser 7 hbase apache-storm
我在Hadoop上运行的Storm拓扑配置为伪分布式模式.拓扑包含一个必须将数据写入Hbase的螺栓.我的第一个用于测试目的的execute方法是在我的bolt 方法中创建(和关闭)连接和写入数据.但是看起来我的本地机器上没有那么多资源来处理所有进入HBase的请求.在大约30个成功处理请求后,我在Storm工作日志中看到以下内容:
o.a.z.ClientCnxn [INFO] Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
o.a.z.ClientCnxn [INFO] Socket connection established to localhost/127.0.0.1:2181, initiating session
o.a.z.ClientCnxn [INFO] Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect
o.a.h.h.z.RecoverableZooKeeper [WARN] Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
Run Code Online (Sandbox Code Playgroud)
我的想法是通过为每个螺栓实例创建单个连接来减少HBase的连接数 - 在prepare方法中打开连接并关闭它cleanup.但是根据文档cleanup不保证在分布式模式下调用.
在此之后,我发现了Storm的Hbase框架 - storm-hbase.不幸的是,几乎没有关于它的信息,只是README在它的github回购.
此外,我需要能够从HBase表中删除单元格.但我在storm-hbase doc中没有找到任何关于它的内容.
提前致谢!
例如,您可以使用“发布者”线程吗?
这是:有一个单独的类作为线程运行,它将为您执行对 hbase/mysql/elasticsearch/hdfs/etc 的请求。并且出于性能原因应该分批进行。
有一个处理并发操作的全局列表和一个执行器服务:
private transient BlockingQueue<Tuple> insertQueue;
private transient ExecutorService theExecutor;
private transient Future<?> publisherFuture;
Run Code Online (Sandbox Code Playgroud)有一个线程类将为您插入文档
private class Publisher implements Runnable {
@Override
public void run() {
long sendBatchTs = System.currentTimeMillis();
while (true){
if(insertQueue.size >100){ // 100 tuples per batch
List<Tuple> batchQueue = new ArrayList<>(100);
insertQueue.drainTo(batchQueue, 100);
// write code to insert the 100 documents
sendBatchTs = System.currentTimeMillis();
}
else if (System.currentTimeMillis() - sendBatchTs > 5000){
// to prevent tuple timeout
int listSize = batchQueue.size();
List<Tuple> batchQueue = new ArrayList<>(listSize);
insertQueue.drainTo(batchQueue, listSize);
// write code to insert the 100 documents
sendBatchTs = System.currentTimeMillis();
}
}
// your code
}
}
Run Code Online (Sandbox Code Playgroud)在prepare方法中初始化线程类和列表
@Override
public void prepare (final Map _conf, final TopologyContext _context , final OutputCollector _collector) {
// open your connection
insertQueue = new LinkedBlockingQueue<>();
theExecutor = Executors.newSingleThreadExecutor();
publisherFuture = theExecutor.submit(new Publisher());
}
Run Code Online (Sandbox Code Playgroud)清理时关闭连接
@Override
public void cleanup() {
super.cleanup();
theExecutor.shutdown();
publisherFuture.cancel(true);
// close your connection
}
Run Code Online (Sandbox Code Playgroud)在执行方法中收集元组
@Override
public void execute(final Tuple _tuple) {
insertQueue.add(_tuple);
}
Run Code Online (Sandbox Code Playgroud)| 归档时间: |
|
| 查看次数: |
1655 次 |
| 最近记录: |