标签: kafka-consumer-api

了解Kafka主题和分区

我开始学习Kafka用于企业解决方案.

在我的阅读中,我想到了一些问题:

  1. 当一个制作人正在制作一条消息时 - 它会指定它想要发送消息的主题,是吗?它关心分区吗?
  2. 当订阅者正在运行时 - 它是否指定了其组ID,以便它可以是同一主题的消费者群集的一部分,或者是该群体消费者感兴趣的几个主题?
  3. 每个消费者组在代理上是否有相应的分区,或者每个消费者都有一个分区?

  4. 作为经纪人创建的分区,因此不关心消费者?

  5. 由于这是一个每个分区都有一个偏移量的队列,因此消费者有责任指定它想要读取哪些消息吗?是否需要保存其状态?

  6. 从队列中删除邮件时会发生什么? - 例如:保留时间为3小时,然后时间过去了,两侧的偏移量如何处理?

apache-kafka kafka-consumer-api kafka-producer-api

136
推荐指数
3
解决办法
5万
查看次数

是否需要密钥作为向Kafka发送消息的一部分?

KeyedMessage<String, byte[]> keyedMessage = new KeyedMessage<String, byte[]>(request.getRequestTopicName(), SerializationUtils.serialize(message)); 
producer.send(keyedMessage);
Run Code Online (Sandbox Code Playgroud)

目前,我发送的消息没有任何密钥作为键控消息的一部分,它仍然可以使用delete.retention.ms吗?我是否需要发送密钥作为邮件的一部分?将密钥作为消息的一部分是否合适?

apache-kafka kafka-consumer-api

70
推荐指数
2
解决办法
5万
查看次数

Kafka:Consumer API vs Streams API

我最近开始学习Kafka并最终得到这些问题.

  1. Consumer和Stream有什么区别?对我来说,如果任何工具/应用程序消费来自Kafka的消息是Kafka世界中的消费者.

  2. 流是如何不同的,因为这也消耗或产生消息给卡夫卡?为什么需要它,因为我们可以使用Consumer API编写我们自己的消费者应用程序并根据需要处理它们或将它们从消费者应用程序发送到Spark?

我对此做了谷歌,但没有得到任何好的答案.对不起,如果这个问题太琐碎了.

apache-kafka kafka-consumer-api apache-kafka-streams

65
推荐指数
2
解决办法
2万
查看次数

卡夫卡消费者名单

我需要找到一种方法来向卡夫卡询问一系列主题.我知道我可以使用目录中kafka-topics.sh包含的脚本来做到这一点bin\.有了这个列表,我需要每个主题的所有消费者.我找不到该目录中的脚本,也没有在kafka-consumer-api库中找到允许我这样做的类.

这背后的原因是我需要弄清楚主题偏移量与消费者偏移量之间的差异.

有没有办法实现这个目标?或者我是否需要在每个消费者中实现此功能?

apache-kafka kafka-consumer-api

43
推荐指数
5
解决办法
8万
查看次数

Kafka 消息 VS REST 调用

如今在微服务世界中,我在我的工作场所看到很多使用 kafka 消息传递的设计,当您可以使用微服务之间的 rest api 调用获得类似的结果时。从技术上讲,您可以完全停止使用 rest api 调用,而是使用 kafka 消息传递。我真的很想知道最佳实践,优缺点,微服务之间何时使用 api 调用,何时使用 kafka 消息传递。

让我们举一个现实生活中的例子:

我有库存服务和供应商服务。日常供应商服务调用供应商 API 来获取新项目,这些需要转移到库存服务中。项目数最多可达 10,000 个对象。

对于这个用例,最好是:

  1. 从供应商 API 获取新数据后,调用库存服务的 REST API 来存储新项目。

  2. 从供应商 API 获取新数据后,将它们作为消息发送到 kafka 主题,供库存服务使用

您会选择哪种方式以及考虑什么

rest apache-kafka microservices kafka-consumer-api spring-kafka

41
推荐指数
2
解决办法
1万
查看次数

"重新平衡"在Apache Kafka上下文中意味着什么?

我是Kafka的新用户,现在已经试用了大约2-3周.我相信目前我已经很好地理解了Kafka在大多数情况下的工作原理,但是在尝试为我自己的Kafka消费者设计API之后(这是模糊不清的但是我遵循了新的KafkaConsumer应该遵循的准则可用于v 0.9,它出现在'trunk'repo atm上)如果我有多个具有相同groupID的消费者,我就会从主题中消耗延迟问题.

在此设置中,我的控制台始终记录有关"重新平衡触发"的问题.当我向消费者群体添加新的消费者时,是否会发生重新平衡,并且为了找出同一个群组ID中的哪个消费者实例将获得哪些分区或完全用于其他内容的重新平衡而触发它们?

我也从https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design中看到了这段话,我似乎无法理解它,所以如果有人能帮助我做感觉非常感激:

重新平衡是一组消费者实例(属于同一组)协调以拥有该组订阅的互斥主题分区集的过程.在成功完成消费者组的重新平衡操作结束时,所有订阅主题的每个分区都将由该组中的单个消费者实例拥有.重新平衡的工作方式如下.每个经纪人都被选为消费者群体子集的协调者.组的协调代理负责协调有关订阅主题的使用者组成员身份更改或分区更改的重新平衡操作.它还负责将生成的分区所有权配置传递给正在进行重新平衡操作的组的所有使用者.

apache-kafka kafka-consumer-api

39
推荐指数
3
解决办法
4万
查看次数

Kafka消费者获取主题的元数据失败

我正在尝试为第三方的Kafka和ZooKeeper服务器编写Java客户端.我能够列出和描述主题,但是当我尝试阅读任何主题时,ClosedChannelException会引发a.我使用命令行客户端在这里重现它们.

$ bin/kafka-console-consumer.sh --zookeeper 255.255.255.255:2181 --topic eventbustopic
[2015-06-02 16:23:04,375] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:1,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException                                       
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)           
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)        
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)                
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)        
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)        
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)         
[2015-06-02 16:23:04,515] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:0,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException                                       
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)           
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)        
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)                
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)        
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)        
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)         
Run Code Online (Sandbox Code Playgroud)

备用命令成功: …

apache-kafka kafka-consumer-api

37
推荐指数
2
解决办法
6万
查看次数

适用于Kafka 0.10.0.0及更高版本的session.timeout.ms和max.poll.interval.ms之间的差异

我不清楚为什么我们需要session.timeout.ms和max.poll.interval.ms以及何时使用一个或另一个或两者?似乎两者都表明时间协调器的上限将等待消费者获得心跳,然后再将其假死.

另外,对于基于KIP-62的 0.10.1.0+版本,它的表现如何?

apache-kafka kafka-consumer-api

36
推荐指数
1
解决办法
3万
查看次数

如何从一开始就使用Kafka Consumer API读取数据?

每次运行消费者jar时,请任何人都可以告诉我如何使用Kafka Consumer API从一开始就阅读消息.

apache-kafka kafka-consumer-api

35
推荐指数
6
解决办法
6万
查看次数

在KAFKA中消费后删除消息

我正在使用apache kafka来生成和使用5GB大小的文件.我想知道是否有一种方法可以在消费后自动删除主题中的消息.我有办法跟踪消费消息吗?我不想手动删除它.

apache-kafka kafka-consumer-api

33
推荐指数
3
解决办法
3万
查看次数