使用Kafka Spout进行卡夫卡风暴整合

Yav*_*var 5 java apache-kafka apache-storm

我正在使用KafkaSpout.请在下面找到测试程序.

我正在使用Storm 0.8.1.Storm 0.8.2中有Multischeme类.我将使用它.我只想通过实例化StringScheme()类来了解早期版本是如何工作的?我在哪里可以下载早期版本的Kafka Spout?但我怀疑这是一个正确的选择,而不是在Storm 0.8.2上工作.??? (困惑)

当我在风暴集群上运行代码(如下所示)时(即当我推动我的拓扑结构时),我得到以下错误(当Scheme部分被注释时会发生这种情况,否则当然我会得到编译器错误,因为类不存在于0.8中0.1):

java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
        at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme
Run Code Online (Sandbox Code Playgroud)

在下面给出的代码中,您可以找到spoutConfig.scheme = new StringScheme(); 部分评论.如果我不评论那条很自然的行,因为那里没有构造函数,我收到了编译错误.此外,当我实例化MultiScheme时,我得到错误,因为我在0.8.1中没有该类.

public class TestTopology {
    public static class PrinterBolt extends BaseBasicBolt {
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }

        public void execute(Tuple tuple, BasicOutputCollector collector) {
            System.out.println(tuple.toString());
        }
    }

    public static void main(String [] args) throws Exception {
        List<HostPort> hosts = new ArrayList<HostPort>();
        hosts.add(new HostPort("127.0.0.1",9092));
        LocalCluster cluster = new LocalCluster();
        TopologyBuilder builder = new TopologyBuilder();
        SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
        spoutConfig.zkServers=ImmutableList.of("localhost");
        spoutConfig.zkPort=2181;
        //spoutConfig.scheme=new StringScheme();
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        builder.setSpout("spout",new KafkaSpout(spoutConfig));
        builder.setBolt("printer", new PrinterBolt())
                .shuffleGrouping("spout");
        Config config = new Config();

        cluster.submitTopology("kafka-test", config, builder.createTopology());

        Thread.sleep(600000);
    }
Run Code Online (Sandbox Code Playgroud)

Chr*_*ord 9

我有同样的问题.最后解决了它,我把完整的运行示例放在github上.

欢迎您在这里查看> https://github.com/buildlackey/cep

(单击storm + kafka目录以获取应该启动并运行的示例程序).

  • 考虑在您的答案中添加一两句话来描述您所做的事情,以便您的答案相关而不依赖于Git存储库处于活动状态. (8认同)
  • @neontapir意味着添加一些信息作为该代码如何解决问题,即,所以你对解决方案有一些了解,而无需转到示例代码. (2认同)

小智 5

我们遇到了类似的问题.

我们的方案:

  1. 打开pom.xml

  2. 将范围从提供更改为 <scope>compile</scope>

如果您想了解有关依赖范围的更多信息,请检查maven docu: Maven docu - 依赖范围