在风暴螺栓中缓存

big*_*big 5 java caching apache-storm

我试图在暴风雨中缓存一些数据,但不确定这是否是正确的方法.在下面的类中,员工ID和员工姓名被缓存到哈希映射中.为此,已对Employee表进行数据库调用以选择所有员工并在prepare方法中填充哈希映射(这是初始化映射的正确位置吗?).

经过一些日志记录后(运行风暴拓扑),拓扑结构正在进行多个数据库连接并多次初始化映射.当然,我想避免这种情况,这就是为什么我要缓存结果,以便它不会每次都进入数据库.请帮忙?

public class TestBolt extends BaseRichBolt {
    private static final long serialVersionUID = 2946379346389650348L;
    private OutputCollector collector;
    private Map<String, String> employeeIdToNameMap;
    private static final Logger LOG = Logger.getLogger(TestBolt.class);

    @Override
    public void execute(Tuple tuple) {    
        String employeeId = tuple.getStringByField("employeeId");
        String employeeName = employeeIdToNameMap.get(employeeId);

        collector.emit(tuple, new Values(employeeId, employeeName));
        collector.ack(tuple);
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector = collector;
        try {
            employeeIdToNameMap = createEmployeIdToNameMap();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(/*some fields*/));

    }

    private Map<String, String> createEmployeIdToNameMap() throws SQLException {
        final Map<String, String> employeeIdToNameMap = new HashMap<>();
        final DatabaseManager dbm = new PostgresManager();
        final String query = "select id, name from employee;";
        final Connection conn = dbm.createDefaultConnection();
        final ResultSet result = dbm.executeSelectQuery(conn, query);
        while(result.next()) {
            String employeId = result.getString("id");
            String name = result.getString("name");
            employeeIdToNameMap.put(employeId, name);
        }
        conn.close();
        return employeeIdToNameMap;
    }       
}
Run Code Online (Sandbox Code Playgroud)

解决方案 我创建了同步地图,它对我来说很好

private static Map<String, String> employeeIdToNameMap = Collections
            .synchronizedMap(new HashMap<String, String>());
Run Code Online (Sandbox Code Playgroud)

hob*_*lin 1

由于您有多个bolt任务,因此您可以将employeeIdToNameMap标记为静态和易失性。像这样初始化地图 -

try {
 synchronized(TestBolt.class) {
    if (null == employeeIdToNameMap) {
    employeeIdToNameMap = createEmployeIdToNameMap();
    }
  }  
} catch (SQLException e) {
 ...
}
Run Code Online (Sandbox Code Playgroud)