Storm> Howto将Java回调集成到Spout中

Nut*_*tim 5 streaming clojure message-queue apache-storm

我正在尝试将Storm(请参阅此处)整合到我的项目中.我理解了拓扑,喷口和螺栓的概念.但是现在,我正在试图找出一些事情的实际实现.

A)我有一个带有Java和Clojure的多语言环境.我的Java代码是一个回调类,其中包含触发流数据的方法.推送到这些方法的事件数据是我想用作喷口的.

所以第一个问题是如何将进入这些方法的数据连接到喷口?我正在尝试i)传递backtype.storm.topology.IRichSpout,然后ii)backtype.storm.spout.SpoutOutputCollector(请参阅此处)传递给该spoutopen函数(请参阅此处).但我无法看到实际传递任何类型的地图或列表的方法.

B)我项目的其余部分都是Clojure.通过这些方法将会有大量数据.每个事件的ID都在1到100之间.在Clojure中,我想将来自spout的数据拆分为不同的执行线程.我认为那些将是螺栓.

如何设置Clojure螺栓从喷口中获取事件数据,然后根据传入事件的ID中断线程?

提前谢谢蒂姆

[编辑1]

我实际上已经解决了这个问题.我结束了1)实现我自己的IRichSpout.然后,我2)连接,使得嘴内部的元组将输入数据流在我的Java回调类.我不确定这是不是惯用的.但它编译并运行没有错误.但是,3)我没有看到传入的流数据(肯定存在),来自printstuff bolt.

为了确保事件数据得到传播,我是否需要在spout或bolt实现或拓扑定义中做些具体的事情?谢谢.


      ;; tie Java callbacks to a Spout that I created
      (.setSpout java-callback ibspout)

      (storm/defbolt printstuff ["word"] [tuple collector]
        (println (str "printstuff --> tuple["tuple"] > collector["collector"]"))
      )
      (storm/topology
       { "1" (storm/spout-spec ibspout)
       }
       { "3" (storm/bolt-spec  { "1" :shuffle }
                               printstuff
             )
       })

[编辑2]

根据SO成员Ankur的建议,我正在重新调整我的拓扑结构.在我创建了Java回调之后,我将它的元组传递给下面的IBSpout,使用(.setTuple ibspout (.getTuple java-callback)).我没有传递整个Java回调对象,因为我收到NotSerializable错误.一切都编译并运行没有错误.但同样,我的印刷品没有数据.嗯.


    public class IBSpout implements IRichSpout {

      /**
       * Storm spout stuff
       */
      private SpoutOutputCollector _collector;

      private List _tuple = new ArrayList();
      public void setTuple(List tuple) { _tuple = tuple; }
      public List getTuple() { return _tuple; }

      /**
       * Storm ISpout interface functions
       */
      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
      }
      public void close() {}
      public void activate() {}
      public void deactivate() {}
      public void nextTuple() {
        _collector.emit(_tuple);
      }
      public void ack(Object msgId) {}
      public void fail(Object msgId) {}


      public void declareOutputFields(OutputFieldsDeclarer declarer) {}
      public java.util.Map  getComponentConfiguration() { return new HashMap(); }

    }

G G*_*III 0

对B部分的回答:

在我看来,简单的答案听起来就像您正在寻找一个字段分组,这样您就可以控制在执行期间按 ID 将哪些工作分组在一起。

也就是说,我不确定这是否真的是一个完整的答案,因为我不知道你为什么要这样做。如果你只是想要平衡工作量,随机分组是更好的选择。