如何在Storm Trident拓扑中关闭由IBackingMap实现打开的数据库连接?

bop*_*cat 10 trident apache-storm

我正在为我的Trident拓扑实现一个IBackingMap,以便将元组存储到ElasticSearch(我知道GitHub上已存在多个Trident/ElasticSearch集成实现,但我决定实现一个更适合我的任务的自定义实现).

所以我的实现是一个经典的工厂:

public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {

    // omitting here some other cool stuff...
    private final Client client;

    public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {

        return new StateFactory() {

            @Override
            public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

                ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
                CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
                MapState ms = OpaqueMap.build(cm);
                return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
            }
        };
    }

    public ElasticSearchBackingMap(String host, int port, String clusterName) {

        Settings settings = ImmutableSettings.settingsBuilder()
                .put("cluster.name", clusterName).build();

        // TODO add a possibility to close the client
        client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(host, port));
    }

    // the actual implementation is left out
}
Run Code Online (Sandbox Code Playgroud)

您会看到它将主机/端口/集群名称作为输入参数,并创建一个ElasticSearch客户端作为类的成员,但从不关闭客户端.

然后以非常熟悉的方式从拓扑中使用它:

tridentTopology.newStream("spout", spout)
            // ...some processing steps here...
            .groupBy(aggregationFields)
            .persistentAggregate(
                    ElasticSearchBackingMap.getFactoryFor(
                            ElasticSearchConfig.ES_HOST,
                            ElasticSearchConfig.ES_PORT,
                            ElasticSearchConfig.ES_CLUSTER_NAME
                    ),
                    new Fields(FieldNames.OUTCOME),
                    new BatchAggregator(),
                    new Fields(FieldNames.AGGREGATED));
Run Code Online (Sandbox Code Playgroud)

此拓扑包含在一些公共静态void main中,打包在jar中并发送到Storm以执行.

问题是,我应该担心关闭ElasticSearch连接还是Storm自己的业务?如果它不是由Storm完成的,拓扑生命周期中的方式和时间我应该这样做吗?

提前致谢!

bop*_*cat 3

好吧,回答我自己的问题。

首先,再次感谢 @dedek 的建议并在 Storm 的 Jira 中恢复票证。

最后,由于没有官方方法可以做到这一点,我决定使用 Trident 过滤器的 cleanup() 方法。到目前为止,我已经验证了以下内容(针对 Storm v. 0.9.4):

使用本地集群

  • cleanup() 在集群关闭时被调用
  • cleanup() 在杀死拓扑时不会被调用,这不应该是一场悲剧,很可能人们不会使用 LocalCluster 进行真正的部署

拥有真实的集群

  • 当拓扑被终止以及工作线程使用pkill -TERM -ustorm -f 'backtype.storm.daemon.worker'停止时,它会被调用
  • 如果工作人员被kill -9杀死,或者当工作人员崩溃时,或者 - 可悲的是 - 当工作人员因异常而死亡时,它不会被调用

总的来说,这或多或少地保证了 cleanup() 的调用,前提是您要小心异常处理(无论如何,我倾向于将“thundercatches”添加到我的每个 Trident 原语中)。

我的代码:

public class CloseFilter implements Filter {

    private static final Logger LOG = LoggerFactory.getLogger(CloseFilter.class);

    private final Closeable[] closeables;

    public CloseFilter(Closeable... closeables) {
        this.closeables = closeables;
    }

    @Override
    public boolean isKeep(TridentTuple tuple) {
        return true;
    }

    @Override
    public void prepare(Map conf, TridentOperationContext context) {

    }

    @Override
    public void cleanup() {
        for (Closeable c : closeables) {
            try {
                c.close();
            } catch (Exception e) {
                LOG.warn("Failed to close an instance of {}", c.getClass(), e);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

不过,如果有一天用于关闭连接的挂钩成为 API 的一部分,那就太好了。