使用Kafka客户端Java库,使用日志已经工作了一段时间但是出现以下错误它不再起作用:
2016-07-15 19:37:54.609 INFO 4342 --- [main] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483647 dead.
2016-07-15 19:37:54.933 ERROR 4342 --- [main] o.a.k.c.c.internals.ConsumerCoordinator : Error UNKNOWN_MEMBER_ID occurred while committing offsets for group logstash
2016-07-15 19:37:54.933 WARN 4342 --- [main] o.a.k.c.c.internals.ConsumerCoordinator : Auto offset commit failed: Commit cannot be completed due to group rebalance
2016-07-15 19:37:54.941 ERROR 4342 --- [main] o.a.k.c.c.internals.ConsumerCoordinator : Error UNKNOWN_MEMBER_ID occurred while committing offsets for group logstash
2016-07-15 19:37:54.941 WARN 4342 --- [main] o.a.k.c.c.internals.ConsumerCoordinator : Auto offset commit failed: …Run Code Online (Sandbox Code Playgroud) 我正在读这个:
自动提交提交偏移的最简单方法是允许消费者为您执行此操作.如果配置enable.auto.commit = true,则消费者每五秒钟将提交客户端从poll()收到的最大偏移量.五秒间隔是默认值,可通过设置auto.commit.interval.ms来控制.就像消费者中的其他所有内容一样,自动提交由poll循环驱动.无论何时轮询,消费者都会检查是否需要提交,如果是,它将提交它在上次轮询中返回的偏移量.
也许问题是我的英语不好但我不完全理解这个描述.
假设我使用默认间隔的自动提交 - 5秒,并且每7秒轮询一次.在这种情况下,提交将每5秒或每7秒发生一次?
如果民意调查每3秒发生一次,你能澄清一下行为吗?是每5秒还是每6秒发生一次?
我读过这个:
自动提交:您可以将auto.commit设置为true,并将auto.commit.interval.ms属性设置为以毫秒为单位的值.启用此功能后,Kafka使用者将提交响应其poll()调用而收到的最后一条消息的偏移量.poll()调用在set auto.commit.interval.ms的后台发出.
这与答案相矛盾.
你能详细解释这些东西吗?
让我说我有这样的图:
0秒 - 轮询
4秒 - 轮询
8秒 - 轮询
什么时候会提交偏移以及何时提交?
我有一个带有自动缩放器的服务,每个实例都需要位于一个单独的消费者组中,我通过创建随机消费者组名称来实现它group-id: my-service-${random.uuid}。我想知道如果所有消费者都消失了,一个消费者组会发生什么。我注意到在 Confluence 平台中,我有很多消费者组,但没有任何消费者。
没有任何消费者的消费群体还能存在多久?
如何配置消费者组在服务被删除后 5 分钟后被删除(我希望删除该组,而不仅仅是删除偏移量)?
spring apache-kafka spring-boot kafka-consumer-api spring-kafka
我正在尝试使用以下代码Source通过AdminCommand创建一个kafka主题
ZkClient zkClient = new ZkClient(kafkaHost, 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, "pa_reliancepoc_telecom_usageevent", 10, 2, new Properties());
Run Code Online (Sandbox Code Playgroud)
但得到以下例外
Exception in thread "main" kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:155)
Run Code Online (Sandbox Code Playgroud)
但是,我可以使用shell命令创建主题.
我在3个不同的VM中有3个Kafka代理,其中一个还运行Zookeeper.我现在创建一个包含8个分区的主题.生产者在创建的"主题"上将消息推送到这些代理组.
我是kafka的新手并试图了解是否有办法从上次消耗的偏移中读取消息,但不是从头开始.
我正在写一个案例,所以我的意图不会偏离.
Eg:
1) I produced 5 messages at 7:00 PM and console consumer consumed those.
2) I stopped consumer at 7:10 PM
3) I produced 10 message at 7:20 PM. No consumer had read those messages.
4) Now, i have started console consumer at 7:30 PM, without from-beginning.
5) Now, it Will read the messages produced after it has started. Not the earlier ones, which were produced at 7.20 PM
Run Code Online (Sandbox Code Playgroud)
有没有办法从最后消耗的偏移量中获取消息.?
当我尝试使用kafka控制台工具(V 0.9.0.1,我认为这使用旧的消费者API)来消费来自ec2中托管的kafka服务器的消息时,我得到以下异常.我怎么能克服这个?
#./ kafka-console-consumer.sh --zookeeper zookeeper1.xx.com:2181 - 主题MY_TOPIC - 从头开始
[2016-04-06 14:34:58,219] WARN Fetching topic metadata with correlation id 0 for topics [Set(MY_TOPIC)] from broker [BrokerEndPoint(1014,kafka3.xx.com,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-04-06 14:34:58,222] WARN Fetching topic metadata with correlation id 0 for topics [Set(MY_TOPIC)] from broker [BrokerEndPoint(1013,kafka22.xx.com,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) …Run Code Online (Sandbox Code Playgroud) 我正在使用Kafka,我们有一个用例来构建一个容错系统,甚至连一条消息都不会错过.所以这就是问题所在:如果由于任何原因(ZooKeeper down,Kafka broker等)向Kafka发布失败,我们如何能够有效地处理这些消息并在事情再次恢复后重播它们.正如我所说的那样,即使单个消息失败也无法承受.另一个用例是我们还需要在任何给定的时间点知道有多少消息由于任何原因而无法发布到Kafka,例如计数器功能,现在这些消息需要再次重新发布.
其中一个解决方案是将这些消息推送到某个数据库(如Cassandra,其中写入速度非常快,但我们还需要计数器功能,我猜Cassandra计数器功能并不是那么好,我们不想使用它.)可以处理这种负载也为我们提供了非常准确的计数器设施.
这个问题更多来自架构方面,然后是使用哪种技术来实现这一目标.
PS:我们处理像3000TPS这样的地方.因此,当系统启动失败时,这些失败的消息可以在非常短的时间内快速增长.我们正在使用基于java的框架.
谢谢你的帮助!
我有一个消费者工作者应用程序,内部正在启动X线程数,每个线程产生它的KafkaCosnumer.Cosnumers拥有groupId相同的主题并订阅相同的主题.因此,每个消费者都可以获得公平的分区份额.
处理的本质是我不能丢失消息,也不能允许重复.我正在运行的kafka版本是0.10.2.1.
这是我面临的问题:消费者线程1开始消费消息,然后poll()获取一批消息.我也实现了ConsumerRebalanceListener,所以每次成功处理消息时,它都会被添加到offsets地图中.(请参阅下面的代码.)因此,一旦重新平衡发生,我可以在将分区重新分配给其他使用者之前提交我的偏移量.有时,为了处理该批处理,需要更长的时间max.poll.interval.ms,这是重新平衡发生的地方,分区从消费者1中提取并分配给消费者2.消费者1不知道分区被撤销并继续处理消息,在同时,消费者2从最后一个偏移量(由RebalanceListener提交)中获取并处理相同的消息.
有没有办法通知消费者他已撤销分区,以便他可以停止处理已经分配给其他消费者的循环中的消息?
public class RebalanceListener<K, V> implements ConsumerRebalanceListener {
private final KafkaConsumer<K, V> consumer;
private static final ConcurrentMap<TopicPartition, OffsetAndMetadata> CURRENT_OFFSETS =
Maps.newConcurrentMap();
private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class);
public RebalanceListener(KafkaConsumer<K, V> consumer) {
this.consumer = consumer;
}
public void addOffset(String topic, int partition, long offset) {
LOGGER.debug("message=Adding offset to offsets map, topic={}, partition={}, offset={}",
topic, partition, offset);
CURRENT_OFFSETS.put(new TopicPartition(topic, partition),
new OffsetAndMetadata(offset, "commit"));
}
public …Run Code Online (Sandbox Code Playgroud) 我想按时间戳重置kafka用户组的偏移量。但是当我使用以下命令时:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-datetime 2017-11-1907:52:43:00:000 --group <group_name> --topic <topic_name> --execute
Run Code Online (Sandbox Code Playgroud)
我收到以下错误消息:
注意:这只会显示有关使用Java使用者API的使用者(非基于ZooKeeper的使用者)的信息。
如何根据时间重置偏移量
apache-kafka ×10
java ×5
autocommit ×1
cassandra ×1
offset ×1
redis ×1
scala ×1
spring ×1
spring-boot ×1
spring-kafka ×1