我按照本教程安装了Apache Storm 1.0:http://opensourceeducation.net/how-to-install-and-configure-apache-strom/.但我注意到两个问题:
- >我无法从互联网访问风暴ui.
- >如果我尝试访问localhost:8080,我收到错误:
org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?
at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90)
at org.apache.storm.ui.core$cluster_configuration.invoke(core.clj:343)
at org.apache.storm.ui.core$fn__12106.invoke(core.clj:929)
at org.apache.storm.shade.compojure.core$make_route$fn__2467.invoke(core.clj:93)
at org.apache.storm.shade.compojure.core$if_route$fn__2455.invoke(core.clj:39)
at org.apache.storm.shade.compojure.core$if_method$fn__2448.invoke(core.clj:24)
at org.apache.storm.shade.compojure.core$routing$fn__2473.invoke(core.clj:106)
at clojure.core$some.invoke(core.clj:2570)
at org.apache.storm.shade.compojure.core$routing.doInvoke(core.clj:106)
at clojure.lang.RestFn.applyTo(RestFn.java:139)
at clojure.core$apply.invoke(core.clj:632)
at org.apache.storm.shade.compojure.core$routes$fn__2477.invoke(core.clj:111)
at org.apache.storm.shade.ring.middleware.json$wrap_json_params$fn__11576.invoke(json.clj:56)
at org.apache.storm.shade.ring.middleware.multipart_params$wrap_multipart_params$fn__3543.invoke(multipart_params.clj:103)
at org.apache.storm.shade.ring.middleware.reload$wrap_reload$fn__4286.invoke(reload.clj:22)
at org.apache.storm.ui.helpers$requests_middleware$fn__3770.invoke(helpers.clj:46)
at org.apache.storm.ui.core$catch_errors$fn__12301.invoke(core.clj:1230)
at org.apache.storm.shade.ring.middleware.keyword_params$wrap_keyword_params$fn__3474.invoke(keyword_params.clj:27)
at org.apache.storm.shade.ring.middleware.nested_params$wrap_nested_params$fn__3514.invoke(nested_params.clj:65)
at org.apache.storm.shade.ring.middleware.params$wrap_params$fn__3445.invoke(params.clj:55)
at org.apache.storm.shade.ring.middleware.multipart_params$wrap_multipart_params$fn__3543.invoke(multipart_params.clj:103)
at org.apache.storm.shade.ring.middleware.flash$wrap_flash$fn__3729.invoke(flash.clj:14)
at org.apache.storm.shade.ring.middleware.session$wrap_session$fn__3717.invoke(session.clj:43)
at …Run Code Online (Sandbox Code Playgroud) 我正在研究Spark(伯克利)集群计算系统.在我的研究中,我了解了其他一些内存系统,如Redis,Memcachedb等.如果有人能给我SPARK和REDIS(以及MEMCACHEDB)之间的比较,那将会很棒.在什么情况下Spark比其他内存系统有优势?
我们需要在创建拓扑时传递一个对象,以便bolt可以访问该对象并根据该对象进行进一步处理.是否有可能通过对象TopplogyContext,如果是,如何?或者在提交拓扑之前是否还有其他方法可以传递对象,然后提交,以便bolt可以对其进行句柄/控制?
我们需要通过上下文传递对象,以便所有螺栓都可以访问它,并且不需要在该拓扑的所有螺栓中强制构造函数的实现.那么,想知道是否存在任何API来做同样的事情?
我试图在多个喷口之间分享任务.我有一种情况,我从外部源一次获得一个元组/消息,我想要有多个spout实例,主要目的是分担负载并提高性能效率.
我可以用一个Spout本身做同样的事情,但我想分享多个喷口的负载.我无法获得分散负载的逻辑.由于消息的偏移在特定喷口完成消耗部件之前将不会被知道(即,基于设置的缓冲器大小).
任何人都可以对如何计算逻辑/算法有一些启发吗?
预付谢谢你的时间.
5)builder.setSpout("spout", new KafkaSpout(cfg), 5);
通过泛滥800 MB每个分区上的数据进行测试,并~22 sec完成读取.
再次,使用parallelism_hint = 1
即代码builder.setSpout("spout", new KafkaSpout(cfg), 1);
现在需要更多~23 sec!为什么?
根据Storm Docs的 setSpout()声明如下:
public SpoutDeclarer setSpout(java.lang.String id,
IRichSpout spout,
java.lang.Number parallelism_hint)
Run Code Online (Sandbox Code Playgroud)
其中,
parallelism_hint - 是执行此喷口应分配的任务数.每个任务都将在群集周围某个进程中的某个线程上运行.
从这里开始:Storm从一开始就被设计为与多种语言兼容.Nimbus是一种Thrift服务,拓扑被定义为Thrift结构.Thrift的使用允许使用任何语言的Storm.
我看到在java中创建的拓扑通过将拓扑(spouts,bolt,ComponentCommon)序列化为Thrift数据类型然后部署在Nimbus上来部署.在Java中,使用其方法和数据很容易地序列化对象.所以在另一方面,Nimbus只需要创建对象并调用它们.(我可能在这里缺少细节,但我希望我能正确理解这一点)
但我想知道如何用C++编写拓扑并以相同的方式部署它.thrift是否有助于序列化基于c ++的拓扑,而Nimbus以与Java相同的方式部署/执行拓扑?
在这方面我看到链接link1 link2,唯一的解决方案似乎是使用Shelbolt.它调用进程并通过标准i/o与之通信.
为了使用Thrift方式,我们是否还需要在C++中重写storm核心?另外,为什么在仅支持JVM语言时使用Thrift?对于像python/c ++这样的语言,似乎根本没有使用Thrift.
我正在使用java类向Storm集群提交拓扑,我还计划使用java类来终止拓扑.但是根据风暴文档,以下命令用于终止拓扑,并且没有Java方法(这有正当理由)
storm kill {stormname}
Run Code Online (Sandbox Code Playgroud)
那么从java类调用shell脚本来终止拓扑是否可以呢?什么是杀死拓扑的其他方法?
另外,如何获得在Storm集群中运行拓扑的状态?
我已经开始使用storm,所以我使用本教程创建了简单的拓扑
当我运行我的拓扑LocalCluster并且一切看起来都很好时,我的问题是我没有得到元组的确认,这意味着我的鲸鱼喷水ack器从未被调用过.
我的代码在下面 - 你知道为什么ack不被调用?
所以我的拓扑看起来像这样
public StormTopology build() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(HelloWorldSpout.class.getSimpleName(),
helloWorldSpout, spoutParallelism);
HelloWorldBolt bolt = new HelloWorldBolt();
builder.setBolt(HelloWorldBolt.class.getSimpleName(),
bolt, boltParallelism)
.shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}
Run Code Online (Sandbox Code Playgroud)
我的喷口看起来像这样
public class HelloWorldSpout extends BaseRichSpout implements ISpout {
private SpoutOutputCollector collector;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("int"));
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
private static Boolean flag = false;
public void nextTuple() {
Utils.sleep(5000); …Run Code Online (Sandbox Code Playgroud) 我正在为我的Trident拓扑实现一个IBackingMap,以便将元组存储到ElasticSearch(我知道GitHub上已存在多个Trident/ElasticSearch集成实现,但我决定实现一个更适合我的任务的自定义实现).
所以我的实现是一个经典的工厂:
public class ElasticSearchBackingMap implements IBackingMap<OpaqueValue<BatchAggregationResult>> {
// omitting here some other cool stuff...
private final Client client;
public static StateFactory getFactoryFor(final String host, final int port, final String clusterName) {
return new StateFactory() {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
ElasticSearchBackingMap esbm = new ElasticSearchBackingMap(host, port, clusterName);
CachedMap cm = new CachedMap(esbm, LOCAL_CACHE_SIZE);
MapState ms = OpaqueMap.build(cm);
return new SnapshottableMap(ms, new Values(GLOBAL_KEY));
}
};
}
public ElasticSearchBackingMap(String host, int port, String clusterName) …Run Code Online (Sandbox Code Playgroud) 我最近熟悉了Apache Kafka,并有一个生产者 - 消费者的工作示例.
我的下一步是将Kafka与Spout和Bolt集成,我很难在本地工作的示例(他们大多是旧的).
我得到以下示例工作storm-book/examples-ch02-getting_started,它从本地文本文件中读取数据.
同样的回购有一个风暴书/例子-ch04-spouts kafka- spout的例子 但是我无法让它工作.
我尝试了以下示例以及cep.kafka,但得到以下错误 -
5034 [Thread-11-words] INFO org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
5047 [Thread-11-words] ERROR backtype.storm.util - Async loop died!
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29) ~[curator-client-2.4.0.jar:na]
at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:169) ~[curator-framework-2.4.0.jar:na]
at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) ~[curator-client-2.4.0.jar:na]
at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) ~[curator-client-2.4.0.jar:na]
at org.apache.curator.ConnectionState.reset(ConnectionState.java:219) ~[curator-client-2.4.0.jar:na]
at org.apache.curator.ConnectionState.start(ConnectionState.java:103) ~[curator-client-2.4.0.jar:na]
at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:188) ~[curator-client-2.4.0.jar:na]
at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:234) ~[curator-framework-2.4.0.jar:na]
at storm.kafka.ZkState.<init>(ZkState.java:62) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_05]
5049 [Thread-11-words] ERROR backtype.storm.daemon.executor -
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
at …Run Code Online (Sandbox Code Playgroud) 我曾经参与过Storm和Spark,但是Samza很新.
我不明白为什么当Storm已经在那里进行实时处理时Samza被引入了.Spark在内存中提供近实时处理,并具有其他非常有用的组件如graphx和mllib.
Samza带来了哪些改进以及可能的进一步改进?
apache-storm ×10
java ×4
apache-kafka ×2
apache-spark ×2
apache-samza ×1
c++ ×1
memcachedb ×1
nimbus ×1
redis ×1
thrift ×1
trident ×1