Storm和Spring 4整合

lia*_*iam 6 java spring apache-storm

我有一个原型风暴应用程序,它读取STOMP流并将输出存储在HBase上.它有效,但不是很灵活,我试图以与我们的其他应用程序更一致的方式设置它,但没有太多运气搞清楚当前使用Storm的方式.我们使用spring-jms类,但不是以标准spring方式使用它们,而是在运行时创建它们,并手动设置依赖项.

这个项目:https://github.com/granthenke/storm-spring看起来很有前景,但由于风暴罐被带入apache孵化器并重新包装,因此几年内没有被触及并且无法正常构建.

有没有我想念的东西,或者将这些东西整合在一起是不值得的?

Aje*_*esh 6

@zenbeni回答了这个问题,但我想告诉你我的实施情况,很难将喷口/螺栓作为弹簧豆.但是要在你的喷口/螺栓内使用其他弹簧豆,你可以声明一个全局变量,并在你的执行方法中检查变量是否为空.如果它为null,则必须从应用程序上下文中获取bean.创建一个类,其中包含一个初始化bean的方法(如果bean尚未初始化).查看ApplicationContextAware接口以获取更多信息(Spring bean重用).

示例代码:

螺栓类:

public class Class1 implements IRichBolt{
    Class2 class2Object;

    public void prepare() {
        if (class2Object== null) {          
            class2Object= (Class2) Util
                .initializeContext("class2");
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

如果尚未初始化Bean,则用于初始化Bean的Util类:

public class Util{
    public static Object initializeContext(String beanName) {
        Object bean = null;
        try {
            synchronized (Util.class) {
                if (ApplicationContextUtil.getAppContext() == null) {
                    ApplicationContext appContext = new ClassPathXmlApplicationContext("beans.xml");
                    bean = ApplicationContextUtil.getAppContext().getBean(
                        beanName);
                } else {
                    bean = ApplicationContextUtil.getAppContext().getBean(
                        beanName);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        return bean;
    }
}
Run Code Online (Sandbox Code Playgroud)

应用程序上下文更改的侦听器:

@Component
public class ApplicationContextUtil implements ApplicationContextAware {

    private static ApplicationContext appContext;

    public void setApplicationContext(ApplicationContext applicationContext)
        throws BeansException {
        appContext = applicationContext;
    }

    public static ApplicationContext getAppContext() {
        return appContext;
    }
}
Run Code Online (Sandbox Code Playgroud)

注意:每个Worker将初始化spring上下文,因为它在不同的JVM中运行.

UPDATE

如果你想使用一个你之前分配了一些值的spring bean类,试试这个,

注意:将当前类传递给Bolt的构造函数

类(拓扑创建类)已包含值:

public class StormTopologyClass implements ITopologyBuilder, Serializable {
    public Map<String, String> attributes = new HashMap<String, String>();

    TopologyBuilder builder=new TopologyBuilder();
    builder.setBolt("Class1",new Class1(this));
    builder.createTopology();
}
Run Code Online (Sandbox Code Playgroud)

使用单个参数构造函数进行螺栓连接:

public class Class1 implements IRichBolt{
    StormTopologyClass topology;

    public Class1 (StormTopologyClass topology) {
        this.topology = topology;
    }
}
Run Code Online (Sandbox Code Playgroud)

现在你可以在bolt类中使用属​​性变量&它的值.


zen*_*eni 5

事实上,风暴春天似乎是你正在寻找的,但它没有更新,并有限制(例如,不能在螺栓/喷口上定义任务等).也许你应该推动自己的整合?

不要忘记你的目标:一个拥有许多工人的集群.当您在另一个工作人员上使用storm api(例如,重新平衡)部署拓扑时,spring的行为如何?是否意味着在Storm部署目标螺栓/喷口并定义执行程序之前,必须在启动时在工作者JVM上实现新的Spring上下文?

恕我直言,如果你只在Spring配置中定义Storm组件它应该工作(拓扑的启动配置然后风暴只管理对象)但是如果你依靠Spring来管理其他组件(对于spring-jms来说似乎如此),那么它可以例如,在拓扑重新平衡上变得混乱(每个worker/jvm单身?还是整个拓扑?).

由你来决定是否值得这么麻烦,我对Spring配置的关注是你很容易忘记风暴拓扑(它似乎是一个JVM,但可以更多).我个人每个类加载器定义自己的单例(例如静态final或者如果我需要延迟instanciation,则使用双重检查锁定),因为它不会隐藏(中高)复杂性.