标签: kafka-consumer-api

commitOffsets是否在高级消费者块上?

在Java客户端(http://kafka.apache.org/documentation.html#highlevelconsumerapi)中,是否在高级使用者块上进行commitOffsets,直到成功提交偏移量,或者它是否为"即发即忘"?

apache-kafka kafka-consumer-api

10
推荐指数
1
解决办法
1569
查看次数

Kafka Java API偏移操作澄清

我正在尝试使用最低级别的Consumer Java API来手动管理偏移,使用最新的kafka_2.10-0.8.2.1.要验证我从Kafka提交/读取的偏移量是否正确,我使用kafka.tools.ConsumerOffsetChecker工具.

以下是我的主题/使用者组的输出示例:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group   elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group           Topic                          Pid Offset          logSize         Lag             Owner
elastic_search_group my_log_topic              0   5               29              24              none
Run Code Online (Sandbox Code Playgroud)

  以下是我对结果的解释:

Offset = 5 - >这是我'elastic_search_group'消费者的当前偏移量

logSize = 29 - >这是最新偏移量 - 将来到此主题/分区的下一条消息的偏移量

滞后= 24 - > 29-5 - 我的'elastic_search_group'消费者尚未处理多少消息

Pid - 分区ID

Q1:这是对的吗?

现在,我想从我的Java消费者那里获得相同的信息.在这里,我发现我必须使用两种不同的API:

kafka.javaapi.OffsetRequest获得最早和最新的抵消,但kafka.javaapi.OffsetFetchRequest获取当前偏移量.

要获得最早(或最新)的偏移我做:

TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api

10
推荐指数
1
解决办法
5918
查看次数

Kafka如何在经纪人之间分配主题分区

我在3个不同的VM中有3个Kafka代理,其中一个还运行Zookeeper.我现在创建一个包含8个分区的主题.生产者在创建的"主题"上将消息推送到这些代理组.

  • Kafka如何在经纪人之间分配主题及其分区?
  • 当新的Kafka Broker加入群集时,Kafka是否会重新分发主题?
  • 创建主题后是否可以增加主题分区?

apache-kafka kafka-consumer-api

10
推荐指数
1
解决办法
3386
查看次数

如何让kafka消费者从上次消费的偏移中读取,而不是从一开始就读取

我是kafka的新手并试图了解是否有办法从上次消耗的偏移中读取消息,但不是从头开始.

我正在写一个案例,所以我的意图不会偏离.

Eg:
1) I produced 5 messages at 7:00 PM and console consumer consumed those.
2) I stopped consumer at 7:10 PM
3) I produced 10 message at 7:20 PM. No consumer had read those messages.
4) Now, i have started console consumer at 7:30 PM, without from-beginning.
5) Now, it Will read the messages produced after it has started. Not the earlier ones, which were produced at 7.20 PM
Run Code Online (Sandbox Code Playgroud)

有没有办法从最后消耗的偏移量中获取消息.?

apache-kafka kafka-consumer-api

10
推荐指数
3
解决办法
1万
查看次数

在获取主题元数据时,Kafka消费者"未能找到领导者"

当我尝试使用Kafka生产者和消费者(0.9.0)脚本来推送/拉取主题中的消息时,我得到以下错误.

制片人错误

[2016-01-13 02:49:40,078] ERROR Error when sending message to topic test with key: null, value: 11 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
Run Code Online (Sandbox Code Playgroud)

消费者错误

> [2016-01-13 02:47:18,620] WARN
> [console-consumer-90116_f89a0b380f19-1452653212738-9f857257-leader-finder-thread],
> Failed to find leader for Set([test,0])
> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(test)] from broker
> [ArrayBuffer(BrokerEndPoint(0,192.168.99.100,9092))] failed   at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)    at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)    at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Caused by: java.io.EOFException   at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
>   at
> …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api kafka-producer-api apache-zookeeper

10
推荐指数
1
解决办法
1万
查看次数

Avro解码提供了java.io.EOFException

我使用Apache avro架构与Kafka 0.0.8V.我在生产者/消费者端使用相同的模式.目前暂无任何变化的模式.但是当我尝试使用消息时,我在消费者处得到了一些例外.为什么我会收到此错误?

制片人

public void sendFile(String topic, GenericRecord payload, Schema schema) throws CoreException, IOException {
    BinaryEncoder encoder = null;
    ByteArrayOutputStream out = null;
    try {
        DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema);
        out = new ByteArrayOutputStream();
        encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(payload, encoder);
        encoder.flush();

        byte[] serializedBytes = out.toByteArray();

        KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes);

            producer.send(message);
        }
Run Code Online (Sandbox Code Playgroud)

消费者

public void run() {
        try {
            ConsumerIterator<byte[], byte[]> itr = stream.iterator();
            while (itr.hasNext()) {

                byte[] data = itr.next().message();

                Schema schema = …
Run Code Online (Sandbox Code Playgroud)

java avro apache-kafka kafka-consumer-api

10
推荐指数
1
解决办法
2121
查看次数

NoBrokersAvailable:NoBrokersAvailable-Kafka 错误

我已经开始学习卡夫卡了。尝试对其进行基本操作。我一直坚持关于“经纪人”的观点。

我的 kafka 正在运行,但是当我想创建一个分区时。

 from kafka import TopicPartition
(ERROR THERE) consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
 consumer.assign([TopicPartition('foobar', 2)])
 msg = next(consumer)
Run Code Online (Sandbox Code Playgroud)

回溯(最近一次调用):文件“”,第 1 行,在文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py”中,第 284 行,在init self._client = KafkaClient(metrics=self._metrics, **self.config) 文件 "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 202, in init self.config['api_version '] = self.check_version(timeout=check_timeout) 文件“/usr/local/lib/python2.7/dist-packages/kafka/client_async.py”,第 791 行,在 check_version 中引发 Errors.NoBrokersAvailable() kafka.errors。 NoBrokersAvailable:NoBrokersAvailable

python apache-kafka kafka-consumer-api kafka-python kafka-producer-api

10
推荐指数
4
解决办法
3万
查看次数

Kafka使用者配置中heartbeat.interval.ms和session.timeout.ms之间的区别

我目前正在运行kafka 0.10.0.1,并且相关的两个值的相应文档如下:

heartbeat.interval.ms - 使用Kafka的组管理工具时,心跳与消费者协调员之间的预期时间.心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开群组时促进重新平衡.该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3.它可以调整得更低,以控制正常重新平衡的预期时间.

session.timeout.ms - 使用Kafka的组管理工具时用于检测故障的超时.如果在会话超时期间未收到消费者的心跳,则代理会将消费者标记为失败并重新平衡该组.由于仅在调用poll()时发送心跳,因此较高的会话超时允许更多时间在消费者的轮询循环中进行消息处理,但代价是检测硬故障的时间较长.另请参阅max.poll.records以获取另一个控制轮询循环中处理时间的选项.

我不清楚为什么文档建议设置heartbeat.interval.ms为1/3 session.timeout.ms.这些值是否相同是没有意义的,因为心跳仅在poll()被调用时发送,因此当处理当前记录时?

apache-kafka kafka-consumer-api

10
推荐指数
1
解决办法
4754
查看次数

不清楚Kafka中auto.offset.reset和enable.auto.commit的含义

我是Kafka的新手,我不太了解Kafka配置的含义,任何人都可以解释为什么更容易理解!

这是我的代码:

 val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "master:9092,slave1:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "GROUP_2017",
  "auto.offset.reset" -> "latest", //earliest or latest
  "enable.auto.commit" -> (true: java.lang.Boolean)
)
Run Code Online (Sandbox Code Playgroud)

这在我的代码中意味着什么?

apache-kafka kafka-consumer-api

10
推荐指数
3
解决办法
2万
查看次数

有没有办法为 kafka 生产者发送的消息设置延迟?

或者甚至是一种延迟消费者收到消息的方法。我需要每 90 秒后在 nodejs 中进行一次函数调用,所以我想为每个 kafka 消息添加 90 秒的延迟

apache-kafka kafka-consumer-api kafka-producer-api

10
推荐指数
1
解决办法
2777
查看次数