当我尝试使用Kafka生产者和消费者(0.9.0)脚本来推送/拉取主题中的消息时,我得到以下错误.
[2016-01-13 02:49:40,078] ERROR Error when sending message to topic test with key: null, value: 11 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
Run Code Online (Sandbox Code Playgroud)
> [2016-01-13 02:47:18,620] WARN
> [console-consumer-90116_f89a0b380f19-1452653212738-9f857257-leader-finder-thread],
> Failed to find leader for Set([test,0])
> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(test)] from broker
> [ArrayBuffer(BrokerEndPoint(0,192.168.99.100,9092))] failed at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Caused by: java.io.EOFException at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
> at
> …Run Code Online (Sandbox Code Playgroud) apache-kafka kafka-consumer-api kafka-producer-api apache-zookeeper
我目前正在运行kafka 0.10.0.1,并且相关的两个值的相应文档如下:
heartbeat.interval.ms - 使用Kafka的组管理工具时,心跳与消费者协调员之间的预期时间.心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开群组时促进重新平衡.该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3.它可以调整得更低,以控制正常重新平衡的预期时间.
session.timeout.ms - 使用Kafka的组管理工具时用于检测故障的超时.如果在会话超时期间未收到消费者的心跳,则代理会将消费者标记为失败并重新平衡该组.由于仅在调用poll()时发送心跳,因此较高的会话超时允许更多时间在消费者的轮询循环中进行消息处理,但代价是检测硬故障的时间较长.另请参阅max.poll.records以获取另一个控制轮询循环中处理时间的选项.
我不清楚为什么文档建议设置heartbeat.interval.ms为1/3 session.timeout.ms.这些值是否相同是没有意义的,因为心跳仅在poll()被调用时发送,因此当处理当前记录时?
我们遇到了卡夫卡的问题.有时突然间,我们会在没有警告的情况下退出同步并在发出事件时开始获取异常.
我们得到的例外是:"java.io.IOException:打开的文件过多"
在许多情况下,这似乎是kafka抛出的一般异常.我们稍微调查一下,我们认为根本原因是当尝试向某个主题发出事件时,它会失败,因为kafka没有针对此主题的领导分区
有人可以帮忙吗?
我是Kafka 0.9的新手并测试了一些功能,我在Java实现的Consumer(KafkaConsumer)中发现了一个奇怪的行为.
Kafka经纪人位于Ambari外部机器中.
即使你我可以实现一个Producer并开始向外部代理发送消息,我也不知道为什么当消费者试图读取事件(民意调查)时,它会被卡住.
我知道生产者工作得很好,因为我可以通过控制台消费者(在ambari本地工作)消费消息.但是当我执行Java Consumer时,什么都没发生,只是卡住了.调试代码我可以看到它在该poll()行被阻止:
ConsumerRecords<String, String> records = consumer.poll(100);
Run Code Online (Sandbox Code Playgroud)
顺便说一句,超时没有任何作用.如果你输入0,100或1000毫秒无关紧要,消费者在这一行被阻止并且不会超时也不会抛出异常.
我尝试了所有类型的替代属性,例如advertised.host.name,advertised.listener,...等等,运气不好.
任何帮助将受到高度赞赏.提前致谢!
假设我在Kafka中为给定主题分配了10个分区.我可以选择在消费者之间自动对这10个分区进行负载均衡?
我已经阅读了这篇文章/sf/answers/2000625441/但我不确定它涵盖了我正在寻找的内容,或者我可能只是没有得到它.
如果我为每个分区启动一个具有一个使用者的工作者,那么该工作人员将使用所有工作.
但是,如果我在其他地方启动同一个工作人员的另一个实例会怎样?客户端库/ Kafka是否会以某种方式检测到这一点,并重新平衡两个worker之间的负载,以便worker1上的一些活动使用者现在处于空闲状态,而worker2上的相同使用者变为活动状态?
我希望能够按需添加和删除工作人员,并将负载分散到那些,这可能吗?
对于Kafka Streams,如果我们使用较低级别的处理器API,我们可以控制是否提交.因此,如果我们的代码中出现问题,并且我们不想提交此消息.在这种情况下,Kafka将多次重新发送此消息,直到问题得到解决.
但是如何控制在使用其更高级别的流DSL API时是否提交消息?
资源:
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html
在我的spring启动服务中尝试启动kafka使用者时看到NoSuchBeanDefinitionException并且无法启动服务本身.
下面是我的bean类,它包含为Kafka配置创建的所有必需bean
Spring Boot版本:1.5.2.RELEASE
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.ns.kafka.gateway.dtos.GatewayCallBackMessage;
@Configuration
@EnableKafka
public class GatewayCallbackToPNConsumerConfig {
@Bean
public Map < String, Object > consumerProps() {
Map < String, Object > props = new HashMap < > ();
props.put(null, null);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "gatewaycallbacktopngroup");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
return props;
}
@Bean
public Deserializer < String > …Run Code Online (Sandbox Code Playgroud) 如果我有一个enable.auto.commit=false和我打电话consumer.poll()而没有打电话consumer.commitAsync(),为什么consumer.poll()下次打电话时会返回新记录?
由于我没有提交我的偏移量,我希望poll()能够返回最新的偏移量,这应该是相同的记录.
我问,因为我在处理过程中试图处理故障情况.我希望不提交偏移量,poll()将再次返回相同的记录,以便我可以再次重新处理这些失败的记录.
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord record : records) {
try {
//process record
consumer.commitAsync();
} catch (Exception e) {
}
/**
If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception.
**/
}
}
}
}
Run Code Online (Sandbox Code Playgroud) 这个kafka错误是什么意思?
[2018-08-22 11:40:49,429] WARN [Consumer clientId = consumer-1,groupId = console-consumer-62114] 1个分区的领导者代理没有匹配的侦听器,包括[topicname-0](org.apache. kafka.clients.NetworkClient)
我在跑步时得到它:
./kafka-console-consumer.sh --topic topicname --bootstrap-server localhost:9094
Run Code Online (Sandbox Code Playgroud)
在尝试阅读本主题时,我在golang程序中遇到了一些错误:
2018年8月22日11点44分12秒ReadOffsetWithRetryOnError康恩错误:<拨TCP:0:连接:连接被拒绝> kafka0:9094主题:0
代码段:
conn, err := kafka.DialLeader(context.Background(), "tcp", ip, getTopic(topic), 0)
if err != nil {
log.Println("ReadOffsetWithRetryOnError conn error: <", err, "> ", ip, " topic:", topic)
}
Run Code Online (Sandbox Code Playgroud)
这很奇怪,因为在阅读不同的主题时,它同时工作正常.
更多错误日志:
/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topicname主题:indexBlock PartitionCount:1
ReplicationFactor:1配置:主题:topicname分区:0领导者:-1副本:1002 Isr:1002
我正在写一个 Kafka 消费者。我需要将环境变量主题名称传递给@KafkaListener(topics = ...). 这是我迄今为止尝试过的:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@Autowired
private EnvProperties envProperties;
private final String topic = envProperties.getTopic();
@KafkaListener(topics = "#{'${envProperties.getTopic()}'}", groupId = "group_id")
public void consume(String message) {
logger.info("Consuming messages " +envProperties.getTopic());
}
}
Run Code Online (Sandbox Code Playgroud)
我在线路上遇到错误topics = "#{'${envProperties.getTopic()}'}",应用程序无法启动。
如何从环境变量动态设置此主题名称?
apache-kafka ×9
spring-kafka ×2
go ×1
java ×1
polling ×1
spring ×1
spring-boot ×1
spring-el ×1