我正在寻找一个网站,下载包含KAFKA和STORM的示例项目(使用NoSQL应该更好).有人可以帮帮我吗?
最好的祝福
我正在使用kafka_2.9.2-0.8.1.1和zookeeper 3.4.6.
有没有办法动态更改代理配置设置?具体来说,我想改变controlled.shutdown.enable
bin/kafka-topics.sh --zookeeper zookeeper01.mysite.com --config controlled.shutdown.enable=true --alter
但我得到了错误
Missing required argument "[topic]"
我们正在使用0.8.2.1建立一个新的Kafka项目,并希望专门为Kafka编写消费者抵消.所以我们设置offsets.storage=kafka并dual.commit.enabled=false在我们的消费者配置中.但是,当我们创建消费者连接器时,它仍然希望连接到ZooKeeper:
kafka.consumer.Consumer.createJavaConsumerConnector(config);
// fails with:
// Caused by: java.lang.IllegalArgumentException: requirement failed:
// Missing required property 'zookeeper.connect'
Run Code Online (Sandbox Code Playgroud)
我想也许我们只需要指定zookeeper.connect它就会被忽略所以我指定了一个无效的主机名,但仍然失败,因为它确实尝试连接.我们真的不希望我们的消费者必须连接到ZooKeeper,如果我们可以避免它.什么给出了什么?
我想在我的项目中实现Storm实时消息处理.我观察到很多人使用'Apache Kafka'和'Storm'.
在我的项目中,客户端应用程序将向服务器端发送消息,服务器端应该对消息进行身份验证,处理它们并存储到HBase中.只有约束是不应该删除消息,每个消息都必须保存到HBase中,如果处理该消息需要几分钟就可以了.
我想知道
请你告诉我Kafka的用法.
我对Apache Kafka中的Topic分区有些困惑。因此,我正在制定一个简单的用例,我想知道在不同情况下会发生什么。所以这里是:
我有一个主题T,它具有4个分区TP1,TP2,TP4和TP4。
假设我有8条消息M1至M8。现在,当我的生产者将这些消息发送到主题T时,在以下情况下,Kafka经纪人将如何接收它们:
方案1:只有一个带有主题T和上述分区的kafka代理实例。
方案2:有两个kafka代理实例,每个节点具有相同的Topic T,并且具有上述分区。
现在假设kafka经纪人实例1出现故障,那么消费者将如何反应?我假设我的消费者正在从代理实例1中读取内容。
我在aws t2实例(一个有1GB内存)上安装kafka.
(1)我下载kafka_2.11-0.9.0.0(2)我运行zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties(3)我尝试运行bin/kafka-server-start.sh config/server .properties,我明白了
Java HotSpot(TM)64位服务器VM警告:INFO:os :: commit_memory(0x00000000c0000000,1073741824,0)失败; error ='无法分配内存'(错误= 12)
#
.#Java Runtime Environment没有足够的内存来继续.
.#Native memory allocation(mmap)无法映射1073741824字节以提交保留内存.
.#包含更多信息的错误报告文件保存为:
.#/ home/machine_name/kafka_2.11-0.9.0.0/hs_err_pid9161.log
我检查了server.properties配置文件和文档中的所有属性,这些属性可以尝试做这样的事情但是找不到任何东西.
有谁知道为什么kafka在开始时试图分配1 gb?
我在现有的CDH 5.5.2集群上安装Kafka-2.0,这是我遵循的程序
我得到的错误(日志文件)
Fatal error during KafkaServerStartable startup. Prepare to shutdown
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at kafka.log.SkimpyOffsetMap.<init>(OffsetMap.scala:43)
at kafka.log.LogCleaner$CleanerThread.<init>(LogCleaner.scala:186)
at kafka.log.LogCleaner$$anonfun$1.apply(LogCleaner.scala:83)
at kafka.log.LogCleaner$$anonfun$1.apply(LogCleaner.scala:83)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.immutable.Range.foreach(Range.scala:166)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.log.LogCleaner.<init>(LogCleaner.scala:83)
at kafka.log.LogManager.<init>(LogManager.scala:64)
at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:601)
at kafka.server.KafkaServer.startup(KafkaServer.scala:180)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
at kafka.Kafka$.main(Kafka.scala:67)
at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:76)
at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala)
Run Code Online (Sandbox Code Playgroud)我使用此命令标记了3个要删除的主题
kafka-topics --zookeeper localhost:2181 --delete --topic topic_name
现在我不能使用它们我也不能重新创建它们如何解决这个问题?要么完全删除它们,要么重新创建它们或取消标记要删除的状态.
我创建了一个生产MSG到一个主题A的生产者,我需要的是我想在那个MSG中进行更改并希望将其发送到另一个主题B,我正在尝试通过Kafka流做到这一点,但不确定是否是正确与否.如果它需要Kafka流,那么请分享应该写的代码?
我创建了一个简单的卡夫卡消费者
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Run Code Online (Sandbox Code Playgroud)
这是卡夫卡消费者
@Component
public class KafkaConsumer {
@KafkaListener(topics = "NewTopic", groupId = "group_id")
public void consume(String message) {
System.out.println("message = " + message);
}
}
Run Code Online (Sandbox Code Playgroud)
当我运行应用程序时出现以下错误
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration': Unexpected exception …Run Code Online (Sandbox Code Playgroud) apache-kafka ×10
apache-storm ×2
cloudera-cdh ×1
hadoop ×1
hadoop2 ×1
hbase ×1
java ×1
rhel ×1
spring ×1
spring-boot ×1
spring-kafka ×1
spring-mvc ×1