如何将两个不同Spout的输出发送到同一个Bolt?

Rit*_*nha 4 java apache-kafka apache-storm

我有两个Kafka Spout,其值我想发送到同一个螺栓.

可能吗 ?

Mat*_*Sax 11

对的,这是可能的:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new KafkaSpout(...));
b.setSpout("topic_2", new KafkaSpout(...));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");
Run Code Online (Sandbox Code Playgroud)

您也可以使用任何其他分组.

更新:

为了区分消费者螺栓中的元组(即topic_1或topic_2),有两种可能性:

1)您可以使用运营商ID(由@ user-4870385建议):

if(input.getSourceComponent().equalsIgnoreCase("topic_1")) {
    //do something
} else {
    //do something
}
Run Code Online (Sandbox Code Playgroud)

2)您可以使用流名称(由@zenbeni建议).对于这种情况,两个spouts都需要声明命名流,并且bolt需要通过流名称连接到spouts:

public class MyKafkaSpout extends KafkaSpout {
  final String streamName;

  public MyKafkaSpout(String stream) {
    this.streamName = stream;
  }

  // other stuff omitted

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // compare KafkaSpout.declareOutputFields(...)
    declarer.declare(streamName, _spoutConfig.scheme.getOutputFields());
  }
}
Run Code Online (Sandbox Code Playgroud)

构建拓扑,现在需要使用流名称:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));
b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");
Run Code Online (Sandbox Code Playgroud)

MyBolt流名称现在可以用来区分输入的元组:

// in my MyBolt.execute():
if(input.getSourceStreamId().equals("Topic1")) {
  // do something
} else {
  // do something
}
Run Code Online (Sandbox Code Playgroud)

讨论:

虽然使用流名称的第二种方法更自然(根据@zenbeni),第一种方法更灵活(IHMO).流名称直接由spout/bolt声明(即,在写入spout/bolt代码时); 与此相反,当拓扑放在一起(即,在喷口/螺栓时操作者ID分配使用).

让我们假设我们得到三个螺栓作为类文件(没有源代码).前两个应该用作生成器,并且都声明具有相同名称的输出流.如果第三个消费者通过流来区分输入元组,则这将不起作用.即使两个给定的生成器螺栓都声明了不同的输出流名称,预期的输入流名称也可能在消费者螺栓中进行硬编码,并且可能不匹配.因此,它也不起作用.但是,如果使用者bolt使用组件名称(即使它们是硬编码的)来区分传入的元组,则可以正确分配预期的组件ID.

当然,可以从给定的类继承(如果没有声明final和覆盖declareOutputFields(...),以便分配自己的流名称.但是,这是更多的额外工作要做.

  • `if(input.getSourceComponent().equalsIgnoreCase(topic_1)){// do something} else {// do something}`在bolt中添加它以从元组中分离源组件 (2认同)