我从Kafka下载页面下载了Apache Kafka.我把它提取到了/opt/apache/installed/kafka-0.7.0-incubating-src.
该快速入门页面说,你需要通过运行来启动饲养员,然后开始卡夫卡:
>bin/kafka-server-start.sh config/server.properties
我正在使用单独的Zookeeper服务器,因此我编辑config/server.properties指向该Zookeeper实例.
当我按照快速入门页面中的说明运行Kafka时,我收到以下错误:
Exception in thread "main" java.lang.NoClassDefFoundError: kafka/Kafka
Caused by: java.lang.ClassNotFoundException: kafka.Kafka
at java.net.URLClassLoader$1.run(URLClassLoader.java:200)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:188)
at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:252)
at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:320)
Could not find the main class: kafka.Kafka. Program will exit.
Run Code Online (Sandbox Code Playgroud)
我使用telnet确保可以从Kafka运行的机器访问Zookeeper实例.一切都好.
为什么我会收到此错误?
我正在尝试设置Kafka参考apache kafka页面上的快速入门指南,并在启动zookeeper服务器时它会陷入这一步...如果有人可以帮助指导我如何启动zookeeper服务器,我将不胜感激
[2015-05-26 15:41:39,216] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2015-05-26 15:41:39,216] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2015-05-26 15:41:39,216] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2015-05-26 15:41:39,235] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
Run Code Online (Sandbox Code Playgroud) 我最近在单节点CDH 5设置上设置了Kafka,目的是在移动到真实集群之前在单个节点上播放它.最初,我刚用kafka服务器启动了zookeeper服务器,一切都很好.我可以看到zookeeper在2181运行,而Kafka在9092运行.然后我创建了一个主题,并启动了消费者和生产者,消费者可以看到生产者发送的消息.
幸福的是,我带着动物园管理员的日志搬到了标签,看到了这个:
[2015-05-27 16:46:07,016] INFO Got user-level KeeperException when processing sessionid:0x14d97bf0a020002 type:create cxid:0x2 zxid:0x1f txntype:-1 reqpath:n/a Error Path:/consumers/test-consumer-group/ids Error:KeeperErrorCode = NoNode for /consumers/test-consumer-group/ids (org.apache.zookeeper.server.PrepRequestProcessor)
[2015-05-27 16:46:07,021] INFO Got user-level KeeperException when processing sessionid:0x14d97bf0a020002 type:create cxid:0x3 zxid:0x20 txntype:-1 reqpath:n/a Error Path:/consumers/test-consumer-group Error:KeeperErrorCode = NoNode for /consumers/test-consumer-group (org.apache.zookeeper.server.PrepRequestProcessor)
[2015-05-27 16:46:07,306] INFO Got user-level KeeperException when processing sessionid:0x14d97bf0a020002 type:create cxid:0x19 zxid:0x24 txntype:-1 reqpath:n/a Error Path:/consumers/test-consumer-group/owners/test Error:KeeperErrorCode = NoNode for /consumers/test-consumer-group/owners/test (org.apache.zookeeper.server.PrepRequestProcessor)
[2015-05-27 16:46:07,307] INFO Got user-level KeeperException when processing sessionid:0x14d97bf0a020002 type:create cxid:0x1a …Run Code Online (Sandbox Code Playgroud) 如果你愿意,你可以跳过这个故事.还有一些人可能会感兴趣.
我有一个Java嵌入式ZooKeeper服务器.在单元测试中,我将端口分配给测试服务器.在分配端口之前,我通过打开a来检查它是否未使用ServerSocket,然后关闭它.
它不时发生,在BindException我启动我的服务器时得到的单元测试中(我不能将相同的端口分配给两个服务器,因为我使用文件锁也可以进行互斥).事实证明,原因是,对于端口检查我打开端口,然后我关闭它,它等待一段时间在操作系统级别,直到端口可以重新打开.
但是有一个选项(StandardSocketOptions.SO_REUSEADDR)可以告诉Java套接字,可以重用TIMED_WAIT状态的旧套接字.检查ZooKeeper代码后,它实际上设置为true(请参阅org.apache.zookeeper.server.NIOServerCnxnFactory.configure(InetSocketAddress,int)):
@Override
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
configureSaslLogin();
thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
thread.setDaemon(true);
maxClientCnxns = maxcc;
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
}
Run Code Online (Sandbox Code Playgroud)
然而,我的测试证明它不起作用.我得到BindException(在Linux JDK 1.7.0_60下).
在检查了ServerSocketChannel实现(JDK 1.7.0_60)后,我意识到,这绝不会在linux下工作.见sun.nio.ch.ServerSocketChannelImpl.setOption(SocketOption<T>, T):
public <T> ServerSocketChannel setOption(SocketOption<T> paramSocketOption, T paramT) throws IOException
{
if (paramSocketOption == null)
throw new NullPointerException();
if …Run Code Online (Sandbox Code Playgroud) 我正在学习Zookeeper,到目前为止,我还不了解将它用于数据库无法解决的分布式系统的目的.
我读过的用例是通过让Zookeeper客户端读/写Zookeeper服务器来为分布式系统实现锁,屏障等.通过读/写数据库不能实现同样的目标吗?
例如,我的书中描述了使用Zookeeper实现锁定的方法是让想要获取锁定的Zookeeper客户端创建一个ephemeral znode带有顺序标志的服务器lock-znode.然后锁定由其子znode具有最低序列号的客户端拥有.
本书中的所有其他Zookeeper示例再次仅使用它来存储/检索值.
似乎唯一不同的是Zookeeper与数据库/任何存储器的区别在于"观察者"概念.但这可以使用其他东西构建.
我知道我对Zookeeper的简化看法是一种误解.那么有人能告诉我Zookeeper真正提供的数据库/自定义观察者不能做什么吗?
当我启动kafka时,zookeeper发生错误.
INFO Got user-level KeeperException when processing sessionid:0x156028651c00001 type:delete cxid:0x1b zxid:0x59 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)
Run Code Online (Sandbox Code Playgroud) 在这些情况下:
我有以下恐慌:
panic: close of closed channel
goroutine 2849 [running]:
github.com/samuel/go-zookeeper/zk.(*Conn).Close(0xc420795180)
github.com/samuel/go-zookeeper/zk/conn.go:253 47
github.com/curator-go/curator.(*handleHolder).internalClose(0xc4203058f0, 0xc420302470, 0x0)
github.com/curator-go/curator/state.go:136 +0x8d
github.com/curator-go/curator.(*handleHolder).closeAndReset(0xc4203058f0, 0xc42587cd00, 0x1e)
github.com/curator-go/curator/state.go:122 +0x2f
github.com/curator-go/curator.(*connectionState).reset(0xc420302420, 0x1b71d87, 0xf)
github.com/curator-go/curator/state.go:234 +0x55
github.com/curator-go/curator.(*connectionState).handleExpiredSession(0xc420302420)
github.com/curator-go/curator/state.go:351 +0xd9
github.com/curator-go/curator.(*connectionState).checkState(0xc420302420, 0xffffff90, 0x0, 0x0, 0xc425ed2600, 0xed0e5250a)
github.com/curator-go/curator/state.go:318 +0x9c
github.com/curator-go/curator.(*connectionState).process(0xc420302420, 0xc425ed2680)
github.com/curator-go/curator/state.go:299 +0x16d
created by github.com/curator-go/curator.(*Watchers).Fire
github.com/curator-go/curator/watcher.go:64 +0x96
Run Code Online (Sandbox Code Playgroud)
这是详细的事件序列:
s.ReregisterAll()- > Conn()- > checkTimeout()- > reset(bc已过去1分钟) - > closeAndReset()- > conn.Close() 可以阻止一秒钟zk.StateExpired(zk集群发送此bc它认为这个客户端已经死了,因为它在2期间没有ping.) - …正如在ZooKeeper的文档中所说,有必要使用daemontools等工具来监督它.但是文档没有提供任何示例,我知道启动ZooKeeper的唯一方法是运行bin/zkServer.sh start.我打算用supervisord来完成这项工作,但是如何编写[program:zookeeper]部分已经超出了我的范围.任何例子都会很棒.谢谢.
到目前为止,有两种解决方案:
[program:zookeeper]
command = /usr/lib/jvm/java-1.7.0-openjdk.x86_64/bin/java -Dzookeeper.log.dir="." -Dzookeeper.root.logger="INFO,CONSOLE" -cp "/home/jizhang/Applications/zookeeper/bin/../build/classes:/home/jizhang/Applications/zookeeper/bin/../build/lib/*.jar:/home/jizhang/Applications/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/home/jizhang/Applications/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/home/jizhang/Applications/zookeeper/bin/../lib/netty-3.2.2.Final.jar:/home/jizhang/Applications/zookeeper/bin/../lib/log4j-1.2.15.jar:/home/jizhang/Applications/zookeeper/bin/../lib/jline-0.9.94.jar:/home/jizhang/Applications/zookeeper/bin/../zookeeper-3.4.3.jar:/home/jizhang/Applications/zookeeper/bin/../src/java/lib/*.jar:/home/jizhang/Applications/zookeeper/bin/../conf:" -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain "/home/jizhang/Applications/zookeeper/bin/../conf/zoo.cfg"
stdout_logfile = /home/jizhang/Applications/zookeeper/zookeeper.out
stderr_logfile = /home/jizhang/Applications/zookeeper/zookeeper.err
autorestart = true
我正在使用Apache Curator库在Zookeeper上进行领导选举.我将我的应用程序代码部署在各种机器上,我只需要从一台机器执行我的代码,这就是为什么我在动物园管理员上进行领导选举,以便我可以检查我是否是领导者,然后执行此代码.
下面是我的LeaderElectionExecutor类,它确保我每个应用程序都有一个Curator实例
public class LeaderElectionExecutor {
private ZookeeperClient zookClient;
private static final String LEADER_NODE = "/testleader";
private static class Holder {
static final LeaderElectionExecutor INSTANCE = new LeaderElectionExecutor();
}
public static LeaderElectionExecutor getInstance() {
return Holder.INSTANCE;
}
private LeaderElectionExecutor() {
try {
String hostname = Utils.getHostName();
String nodes = "host1:2181,host2:2181;
zookClient = new ZookeeperClient(nodes, LEADER_NODE, hostname);
zookClient.start();
// added sleep specifically for the leader to get selected
// since I cannot call isLeader method immediately after starting …Run Code Online (Sandbox Code Playgroud) 我按照本教程安装了Apache Storm 1.0:http://opensourceeducation.net/how-to-install-and-configure-apache-strom/.但我注意到两个问题:
- >我无法从互联网访问风暴ui.
- >如果我尝试访问localhost:8080,我收到错误:
org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?
at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90)
at org.apache.storm.ui.core$cluster_configuration.invoke(core.clj:343)
at org.apache.storm.ui.core$fn__12106.invoke(core.clj:929)
at org.apache.storm.shade.compojure.core$make_route$fn__2467.invoke(core.clj:93)
at org.apache.storm.shade.compojure.core$if_route$fn__2455.invoke(core.clj:39)
at org.apache.storm.shade.compojure.core$if_method$fn__2448.invoke(core.clj:24)
at org.apache.storm.shade.compojure.core$routing$fn__2473.invoke(core.clj:106)
at clojure.core$some.invoke(core.clj:2570)
at org.apache.storm.shade.compojure.core$routing.doInvoke(core.clj:106)
at clojure.lang.RestFn.applyTo(RestFn.java:139)
at clojure.core$apply.invoke(core.clj:632)
at org.apache.storm.shade.compojure.core$routes$fn__2477.invoke(core.clj:111)
at org.apache.storm.shade.ring.middleware.json$wrap_json_params$fn__11576.invoke(json.clj:56)
at org.apache.storm.shade.ring.middleware.multipart_params$wrap_multipart_params$fn__3543.invoke(multipart_params.clj:103)
at org.apache.storm.shade.ring.middleware.reload$wrap_reload$fn__4286.invoke(reload.clj:22)
at org.apache.storm.ui.helpers$requests_middleware$fn__3770.invoke(helpers.clj:46)
at org.apache.storm.ui.core$catch_errors$fn__12301.invoke(core.clj:1230)
at org.apache.storm.shade.ring.middleware.keyword_params$wrap_keyword_params$fn__3474.invoke(keyword_params.clj:27)
at org.apache.storm.shade.ring.middleware.nested_params$wrap_nested_params$fn__3514.invoke(nested_params.clj:65)
at org.apache.storm.shade.ring.middleware.params$wrap_params$fn__3445.invoke(params.clj:55)
at org.apache.storm.shade.ring.middleware.multipart_params$wrap_multipart_params$fn__3543.invoke(multipart_params.clj:103)
at org.apache.storm.shade.ring.middleware.flash$wrap_flash$fn__3729.invoke(flash.clj:14)
at org.apache.storm.shade.ring.middleware.session$wrap_session$fn__3717.invoke(session.clj:43)
at …Run Code Online (Sandbox Code Playgroud) apache-zookeeper ×10
apache-kafka ×4
java ×3
apache-storm ×1
distributed ×1
go ×1
linux ×1
nimbus ×1
sockets ×1
supervisord ×1
tcp ×1