标签: apache-storm

Apache Storm:无法从种子主机中找到领导者的灵气

我按照本教程安装了Apache Storm 1.0:http://opensourceeducation.net/how-to-install-and-configure-apache-strom/.但我注意到两个问题:

- >我无法从互联网访问风暴ui.

- >如果我尝试访问localhost:8080,我收到错误:

org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?
at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90)
at org.apache.storm.ui.core$cluster_configuration.invoke(core.clj:343)
at org.apache.storm.ui.core$fn__12106.invoke(core.clj:929)
at org.apache.storm.shade.compojure.core$make_route$fn__2467.invoke(core.clj:93)
at org.apache.storm.shade.compojure.core$if_route$fn__2455.invoke(core.clj:39)
at org.apache.storm.shade.compojure.core$if_method$fn__2448.invoke(core.clj:24)
at org.apache.storm.shade.compojure.core$routing$fn__2473.invoke(core.clj:106)
at clojure.core$some.invoke(core.clj:2570)
at org.apache.storm.shade.compojure.core$routing.doInvoke(core.clj:106)
at clojure.lang.RestFn.applyTo(RestFn.java:139)
at clojure.core$apply.invoke(core.clj:632)
at org.apache.storm.shade.compojure.core$routes$fn__2477.invoke(core.clj:111)
at org.apache.storm.shade.ring.middleware.json$wrap_json_params$fn__11576.invoke(json.clj:56)
at org.apache.storm.shade.ring.middleware.multipart_params$wrap_multipart_params$fn__3543.invoke(multipart_params.clj:103)
at org.apache.storm.shade.ring.middleware.reload$wrap_reload$fn__4286.invoke(reload.clj:22)
at org.apache.storm.ui.helpers$requests_middleware$fn__3770.invoke(helpers.clj:46)
at org.apache.storm.ui.core$catch_errors$fn__12301.invoke(core.clj:1230)
at org.apache.storm.shade.ring.middleware.keyword_params$wrap_keyword_params$fn__3474.invoke(keyword_params.clj:27)
at org.apache.storm.shade.ring.middleware.nested_params$wrap_nested_params$fn__3514.invoke(nested_params.clj:65)
at org.apache.storm.shade.ring.middleware.params$wrap_params$fn__3445.invoke(params.clj:55)
at org.apache.storm.shade.ring.middleware.multipart_params$wrap_multipart_params$fn__3543.invoke(multipart_params.clj:103)
at org.apache.storm.shade.ring.middleware.flash$wrap_flash$fn__3729.invoke(flash.clj:14)
at org.apache.storm.shade.ring.middleware.session$wrap_session$fn__3717.invoke(session.clj:43)
at …
Run Code Online (Sandbox Code Playgroud)

nimbus apache-storm apache-zookeeper

11
推荐指数
1
解决办法
1万
查看次数

比较内存中的集群计算系统

我正在研究Spark(伯克利)集群计算系统.在我的研究中,我了解了其他一些内存系统,如Redis,Memcachedb等.如果有人能给我SPARK和REDIS(以及MEMCACHEDB)之间的比较,那将会很棒.在什么情况下Spark比其他内存系统有优势?

memcachedb redis apache-spark apache-storm

10
推荐指数
1
解决办法
5367
查看次数

如何在使用storm时将拓扑上下文中的对象访问到bolt中?

我们需要在创建拓扑时传递一个对象,以便bolt可以访问该对象并根据该对象进行进一步处理.是否有可能通过对象TopplogyContext,如果是,如何?或者在提交拓扑之前是否还有其他方法可以传递对象,然后提交,以便bolt可以对其进行句柄/控制?

我们需要通过上下文传递对象,以便所有螺栓都可以访问它,并且不需要在该拓扑的所有螺栓中强制构造函数的实现.那么,想知道是否存在任何API来做同样的事情?

apache-storm

10
推荐指数
1
解决办法
4942
查看次数

Storm-Kafka多个鲸鱼喷水,如何分担负荷?

我试图在多个喷口之间分享任务.我有一种情况,我从外部源一次获得一个元组/消息,我想要有多个spout实例,主要目的是分担负载并提高性能效率.

我可以用一个Spout本身做同样的事情,但我想分享多个喷口的负载.我无法获得分散负载的逻辑.由于消息的偏移在特定喷口完成消耗部件之前将不会被知道(即,基于设置的缓冲器大小).

任何人都可以对如何计算逻辑/算法有一些启发吗?

预付谢谢你的时间.


响应答案更新:
现在在Kafka上使用多分区(即5)
以下是使用的代码:
builder.setSpout("spout", new KafkaSpout(cfg), 5);

通过泛滥800 MB每个分区上的数据进行测试,并~22 sec完成读取.

再次,使用parallelism_hint = 1
即代码builder.setSpout("spout", new KafkaSpout(cfg), 1);

现在需要更多~23 sec!为什么?

根据Storm Docs的 setSpout()声明如下:

public SpoutDeclarer setSpout(java.lang.String id,
                              IRichSpout spout,
                              java.lang.Number parallelism_hint)
Run Code Online (Sandbox Code Playgroud)

其中,
parallelism_hint - 是执行此喷口应分配的任务数.每个任务都将在群集周围某个进程中的某个线程上运行.

java load-balancing apache-kafka apache-storm

10
推荐指数
1
解决办法
1万
查看次数

如何在Storm中编写关于Storm和Thrift使用的c ++ spout/bolt

这里开始:Storm从一开始就被设计为与多种语言兼容.Nimbus是一种Thrift服务,拓扑被定义为Thrift结构.Thrift的使用允许使用任何语言的Storm.

我看到在java中创建的拓扑通过将拓扑(spouts,bolt,ComponentCommon)序列化为Thrift数据类型然后部署在Nimbus上来部署.在Java中,使用其方法和数据很容易地序列化对象.所以在另一方面,Nimbus只需要创建对象并调用它们.(我可能在这里缺少细节,但我希望我能正确理解这一点)

但我想知道如何用C++编写拓扑并以相同的方式部署它.thrift是否有助于序列化基于c ++的拓扑,而Nimbus以与Java相同的方式部署/执行拓扑?

在这方面我看到链接link1 link2,唯一的解决方案似乎是使用Shelbolt.它调用进程并通过标准i/o与之通信.

为了使用Thrift方式,我们是否还需要在C++中重写storm核心?另外,为什么在仅支持JVM语言时使用Thrift?对于像python/c ++这样的语言,似乎根本没有使用Thrift.

c++ thrift apache-storm

10
推荐指数
1
解决办法
3823
查看次数

如何以编程方式杀死风暴拓扑?

我正在使用java类向Storm集群提交拓扑,我还计划使用java类来终止拓扑.但是根据风暴文档,以下命令用于终止拓扑,并且没有Java方法(这有正当理由)

storm kill {stormname}
Run Code Online (Sandbox Code Playgroud)

那么从java类调用shell脚本来终止拓扑是否可以呢?什么是杀死拓扑的其他方法?

另外,如何获得在Storm集群中运行拓扑的状态?

java apache-storm

10
推荐指数
1
解决办法
6786
查看次数

Storm Spout没有得到Ack

我已经开始使用storm,所以我使用本教程创建了简单的拓扑

当我运行我的拓扑LocalCluster并且一切看起来都很好时,我的问题是我没有得到元组的确认,这意味着我的鲸鱼喷水ack器从未被调用过.

我的代码在下面 - 你知道为什么ack不被调用?

所以我的拓扑看起来像这样

public StormTopology build() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(HelloWorldSpout.class.getSimpleName(), 
             helloWorldSpout, spoutParallelism);

        HelloWorldBolt bolt = new  HelloWorldBolt();

        builder.setBolt(HelloWorldBolt.class.getSimpleName(), 
                   bolt, boltParallelism)
              .shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}
Run Code Online (Sandbox Code Playgroud)

我的喷口看起来像这样

public class HelloWorldSpout  extends BaseRichSpout implements ISpout {
    private SpoutOutputCollector collector;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("int"));
    }

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        this.collector = collector;
    }

    private static Boolean flag = false;
    public void nextTuple() {
        Utils.sleep(5000); …
Run Code Online (Sandbox Code Playgroud)

java apache-storm

10
推荐指数
1
解决办法
6449
查看次数

如何在Storm Trident拓扑中关闭由IBackingMap实现打开的数据库连接?

我正在为我的Trident拓扑实现一个IBackingMap,以便将元组存储到ElasticSearch(我知道GitHub上已存在多个Trident/ElasticSearch集成实现,但我决定实现一个更适合我的任务的自定义实现).

所以我的实现是一个经典的工厂:

public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {

    // omitting here some other cool stuff...
    private final Client client;

    public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {

        return new StateFactory() {

            @Override
            public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {

                ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
                CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
                MapState ms = OpaqueMap.build(cm);
                return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
            }
        };
    }

    public ElasticSearchBackingMap(String host, int port, String clusterName) …
Run Code Online (Sandbox Code Playgroud)

trident apache-storm

10
推荐指数
1
解决办法
343
查看次数

KafkaSpout工作实例

我最近熟悉了Apache Kafka,并有一个生产者 - 消费者的工作示例.

我的下一步是将Kafka与Spout和Bolt集成,我很难在本地工作的示例(他们大多是旧的).

我得到以下示例工作storm-b​​ook/examples-ch02-getting_started,它从本地文本文件中读取数据.

同样的回购有一个风暴书/例子-ch04-spouts kafka- spout的例子 但是我无法让它工作.

我尝试了以下示例以及cep.kafka,但得到以下错误 -

5034 [Thread-11-words] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
5047 [Thread-11-words] ERROR backtype.storm.util - Async loop died!
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
        at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:169) ~[curator-framework-2.4.0.jar:na]
        at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.ConnectionState.reset(ConnectionState.java:219) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.ConnectionState.start(ConnectionState.java:103) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:188) ~[curator-client-2.4.0.jar:na]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:234) ~[curator-framework-2.4.0.jar:na]
        at storm.kafka.ZkState.<init>(ZkState.java:62) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
        at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_05]
5049 [Thread-11-words] ERROR backtype.storm.daemon.executor -
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
        at …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-storm

10
推荐指数
1
解决办法
1万
查看次数

Apache Storm vs Apache Samza vs Apache Spark

我曾经参与过Storm和Spark,但是Samza很新.

我不明白为什么当Storm已经在那里进行实时处理时Samza被引入了.Spark在内存中提供近实时处理,并具有其他非常有用的组件如graphx和mllib.

Samza带来了哪些改进以及可能的进一步改进?

apache-spark apache-storm apache-samza

9
推荐指数
1
解决办法
4996
查看次数