标签: apache-kafka

寻找Kafka-> Storm的示例项目

我正在寻找一个网站,下载包含KAFKA和STORM的示例项目(使用NoSQL应该更好).有人可以帮帮我吗?

最好的祝福

apache-kafka apache-storm

0
推荐指数
1
解决办法
3358
查看次数

kafka broker配置动态更改

我正在使用kafka_2.9.2-0.8.1.1和zookeeper 3.4.6.

有没有办法动态更改代理配置设置?具体来说,我想改变controlled.shutdown.enable

bin/kafka-topics.sh --zookeeper zookeeper01.mysite.com --config controlled.shutdown.enable=true --alter

但我得到了错误

Missing required argument "[topic]"

apache-kafka apache-zookeeper

0
推荐指数
1
解决办法
9150
查看次数

具有offsets.storage = kafka的Kafka 0.8.2.1如何仍然需要ZooKeeper?

我们正在使用0.8.2.1建立一个新的Kafka项目,并希望专门为Kafka编写消费者抵消.所以我们设置offsets.storage=kafkadual.commit.enabled=false在我们的消费者配置中.但是,当我们创建消费者连接器时,它仍然希望连接到ZooKeeper:

kafka.consumer.Consumer.createJavaConsumerConnector(config);
// fails with:
// Caused by: java.lang.IllegalArgumentException: requirement failed: 
//      Missing required property 'zookeeper.connect'
Run Code Online (Sandbox Code Playgroud)

我想也许我们只需要指定zookeeper.connect它就会被忽略所以我指定了一个无效的主机名,但仍然失败,因为它确实尝试连接.我们真的不希望我们的消费者必须连接到ZooKeeper,如果我们可以避免它.什么给出了什么?

apache-kafka

0
推荐指数
1
解决办法
2271
查看次数

在使用Apache风暴时是否必须使用Apache Kafka?

我想在我的项目中实现Storm实时消息处理.我观察到很多人使用'Apache Kafka'和'Storm'.

在我的项目中,客户端应用程序将向服务器端发送消息,服务器端应该对消息进行身份验证,处理它们并存储到HBase中.只有约束是不应该删除消息,每个消息都必须保存到HBase中,如果处理该消息需要几分钟就可以了.

我想知道

  1. 使用Kafka和Storm是强制性的吗?
  2. 使用Kafka和Storm有什么好处?
  3. 如果我不使用卡夫卡风暴会怎么样?

请你告诉我Kafka的用法.

hbase apache-kafka apache-storm

0
推荐指数
1
解决办法
156
查看次数

Apache Kafka主题分区消息处理

我对Apache Kafka中的Topic分区有些困惑。因此,我正在制定一个简单的用例,我想知道在不同情况下会发生什么。所以这里是:

我有一个主题T,它具有4个分区TP1,TP2,TP4和TP4。

假设我有8条消息M1至M8。现在,当我的生产者将这些消息发送到主题T时,在以下情况下,Kafka经纪人将如何接收它们:

方案1:只有一个带有主题T和上述分区的kafka代理实例。

方案2:有两个kafka代理实例,每个节点具有相同的Topic T,并且具有上述分区。

现在假设kafka经纪人实例1出现故障,那么消费者将如何反应?我假设我的消费者正在从代理实例1中读取内容。

apache-kafka

0
推荐指数
1
解决办法
1245
查看次数

Kafka未能映射1073741824字节用于提交保留内存

我在aws t2实例(一个有1GB内存)上安装kafka.

(1)我下载kafka_2.11-0.9.0.0(2)我运行zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties(3)我尝试运行bin/kafka-server-start.sh config/server .properties,我明白了

Java HotSpot(TM)64位服务器VM警告:INFO:os :: commit_memory(0x00000000c0000000,1073741824,0)失败; error ='无法分配内存'(错误= 12)

#

.#Java Runtime Environment没有足够的内存来继续.

.#Native memory allocation(mmap)无法映射1073741824字节以提交保留内存.

.#包含更多信息的错误报告文件保存为:

.#/ home/machine_name/kafka_2.11-0.9.0.0/hs_err_pid9161.log

我检查了server.properties配置文件和文档中的所有属性,这些属性可以尝试做这样的事情但是找不到任何东西.

有谁知道为什么kafka在开始时试图分配1 gb?

apache-kafka

0
推荐指数
2
解决办法
6530
查看次数

现有CDH 5.5.2集群上的Kafka配置

我在现有的CDH 5.5.2集群上安装Kafka-2.0,这是我遵循的程序

  1. 从CM添加服务
  2. 选定的Kafka(在此之前,我在所有节点上下载并分发并激活了kafka parcel)
  3. 为KafkaBroker选择1个节点,为Kafka MirrorMaker选择4个节点
  4. 然后我用一个Mirror Maker节点以及具有相同节点的Source Broker List(source.bootstrap.servers)更新了我的目标代理列表(bootstrap.servers)属性
  5. 我得到的错误(日志文件)

    Fatal error during KafkaServerStartable startup. Prepare to shutdown 
    java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
        at kafka.log.SkimpyOffsetMap.<init>(OffsetMap.scala:43)
        at kafka.log.LogCleaner$CleanerThread.<init>(LogCleaner.scala:186)
        at kafka.log.LogCleaner$$anonfun$1.apply(LogCleaner.scala:83)
        at kafka.log.LogCleaner$$anonfun$1.apply(LogCleaner.scala:83)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at scala.collection.immutable.Range.foreach(Range.scala:166)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at kafka.log.LogCleaner.<init>(LogCleaner.scala:83)
        at kafka.log.LogManager.<init>(LogManager.scala:64)
        at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:601)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:180)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
        at kafka.Kafka$.main(Kafka.scala:67)
        at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:76)
        at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala)
    
    Run Code Online (Sandbox Code Playgroud)

hadoop rhel apache-kafka hadoop2 cloudera-cdh

0
推荐指数
1
解决办法
2108
查看次数

取消标记为删除Kafka的主题?

我使用此命令标记了3个要删除的主题 kafka-topics --zookeeper localhost:2181 --delete --topic topic_name

现在我不能使用它们我也不能重新创建它们如何解决这个问题?要么完全删除它们,要么重新创建它们或取消标记要删除的状态.

apache-kafka

0
推荐指数
1
解决办法
4076
查看次数

如何修改一个kafka主题的消息并使用java发送到另一个kafka主题?

我创建了一个生产MSG到一个主题A的生产者,我需要的是我想在那个MSG中进行更改并希望将其发送到另一个主题B,我正在尝试通过Kafka流做到这一点,但不确定是否是正确与否.如果它需要Kafka流,那么请分享应该写的代码?

java apache-kafka apache-kafka-streams

0
推荐指数
1
解决办法
2763
查看次数

CommonErrorHandler 不存在于 kafka spring 中?

我创建了一个简单的卡夫卡消费者

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
  kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}
}
Run Code Online (Sandbox Code Playgroud)

这是卡夫卡消费者

@Component
public class KafkaConsumer {

@KafkaListener(topics = "NewTopic", groupId = "group_id")
public void consume(String message) {
    System.out.println("message = " + message);
}
}
Run Code Online (Sandbox Code Playgroud)

当我运行应用程序时出现以下错误

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration': Unexpected exception …
Run Code Online (Sandbox Code Playgroud)

spring spring-mvc apache-kafka spring-boot spring-kafka

0
推荐指数
1
解决办法
9597
查看次数