在Java客户端(http://kafka.apache.org/documentation.html#highlevelconsumerapi)中,是否在高级使用者块上进行commitOffsets,直到成功提交偏移量,或者它是否为"即发即忘"?
我正在尝试使用最低级别的Consumer Java API来手动管理偏移,使用最新的kafka_2.10-0.8.2.1.要验证我从Kafka提交/读取的偏移量是否正确,我使用kafka.tools.ConsumerOffsetChecker工具.
以下是我的主题/使用者组的输出示例:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
Run Code Online (Sandbox Code Playgroud)
以下是我对结果的解释:
Offset = 5 - >这是我'elastic_search_group'消费者的当前偏移量
logSize = 29 - >这是最新偏移量 - 将来到此主题/分区的下一条消息的偏移量
滞后= 24 - > 29-5 - 我的'elastic_search_group'消费者尚未处理多少消息
Pid - 分区ID
Q1:这是对的吗?
现在,我想从我的Java消费者那里获得相同的信息.在这里,我发现我必须使用两种不同的API:
kafka.javaapi.OffsetRequest获得最早和最新的抵消,但kafka.javaapi.OffsetFetchRequest获取当前偏移量.
要获得最早(或最新)的偏移我做:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, …Run Code Online (Sandbox Code Playgroud) 我在3个不同的VM中有3个Kafka代理,其中一个还运行Zookeeper.我现在创建一个包含8个分区的主题.生产者在创建的"主题"上将消息推送到这些代理组.
我是kafka的新手并试图了解是否有办法从上次消耗的偏移中读取消息,但不是从头开始.
我正在写一个案例,所以我的意图不会偏离.
Eg:
1) I produced 5 messages at 7:00 PM and console consumer consumed those.
2) I stopped consumer at 7:10 PM
3) I produced 10 message at 7:20 PM. No consumer had read those messages.
4) Now, i have started console consumer at 7:30 PM, without from-beginning.
5) Now, it Will read the messages produced after it has started. Not the earlier ones, which were produced at 7.20 PM
Run Code Online (Sandbox Code Playgroud)
有没有办法从最后消耗的偏移量中获取消息.?
当我尝试使用Kafka生产者和消费者(0.9.0)脚本来推送/拉取主题中的消息时,我得到以下错误.
[2016-01-13 02:49:40,078] ERROR Error when sending message to topic test with key: null, value: 11 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
Run Code Online (Sandbox Code Playgroud)
> [2016-01-13 02:47:18,620] WARN
> [console-consumer-90116_f89a0b380f19-1452653212738-9f857257-leader-finder-thread],
> Failed to find leader for Set([test,0])
> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(test)] from broker
> [ArrayBuffer(BrokerEndPoint(0,192.168.99.100,9092))] failed at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Caused by: java.io.EOFException at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
> at
> …Run Code Online (Sandbox Code Playgroud) apache-kafka kafka-consumer-api kafka-producer-api apache-zookeeper
我使用Apache avro架构与Kafka 0.0.8V.我在生产者/消费者端使用相同的模式.目前暂无任何变化的模式.但是当我尝试使用消息时,我在消费者处得到了一些例外.为什么我会收到此错误?
制片人
public void sendFile(String topic, GenericRecord payload, Schema schema) throws CoreException, IOException {
BinaryEncoder encoder = null;
ByteArrayOutputStream out = null;
try {
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema);
out = new ByteArrayOutputStream();
encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(payload, encoder);
encoder.flush();
byte[] serializedBytes = out.toByteArray();
KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes);
producer.send(message);
}
Run Code Online (Sandbox Code Playgroud)
消费者
public void run() {
try {
ConsumerIterator<byte[], byte[]> itr = stream.iterator();
while (itr.hasNext()) {
byte[] data = itr.next().message();
Schema schema = …Run Code Online (Sandbox Code Playgroud) 我已经开始学习卡夫卡了。尝试对其进行基本操作。我一直坚持关于“经纪人”的观点。
我的 kafka 正在运行,但是当我想创建一个分区时。
from kafka import TopicPartition
(ERROR THERE) consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)
Run Code Online (Sandbox Code Playgroud)
回溯(最近一次调用):文件“”,第 1 行,在文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py”中,第 284 行,在init self._client = KafkaClient(metrics=self._metrics, **self.config) 文件 "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 202, in init self.config['api_version '] = self.check_version(timeout=check_timeout) 文件“/usr/local/lib/python2.7/dist-packages/kafka/client_async.py”,第 791 行,在 check_version 中引发 Errors.NoBrokersAvailable() kafka.errors。 NoBrokersAvailable:NoBrokersAvailable
python apache-kafka kafka-consumer-api kafka-python kafka-producer-api
我目前正在运行kafka 0.10.0.1,并且相关的两个值的相应文档如下:
heartbeat.interval.ms - 使用Kafka的组管理工具时,心跳与消费者协调员之间的预期时间.心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开群组时促进重新平衡.该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3.它可以调整得更低,以控制正常重新平衡的预期时间.
session.timeout.ms - 使用Kafka的组管理工具时用于检测故障的超时.如果在会话超时期间未收到消费者的心跳,则代理会将消费者标记为失败并重新平衡该组.由于仅在调用poll()时发送心跳,因此较高的会话超时允许更多时间在消费者的轮询循环中进行消息处理,但代价是检测硬故障的时间较长.另请参阅max.poll.records以获取另一个控制轮询循环中处理时间的选项.
我不清楚为什么文档建议设置heartbeat.interval.ms为1/3 session.timeout.ms.这些值是否相同是没有意义的,因为心跳仅在poll()被调用时发送,因此当处理当前记录时?
我是Kafka的新手,我不太了解Kafka配置的含义,任何人都可以解释为什么更容易理解!
这是我的代码:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "master:9092,slave1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "GROUP_2017",
"auto.offset.reset" -> "latest", //earliest or latest
"enable.auto.commit" -> (true: java.lang.Boolean)
)
Run Code Online (Sandbox Code Playgroud)
这在我的代码中意味着什么?
或者甚至是一种延迟消费者收到消息的方法。我需要每 90 秒后在 nodejs 中进行一次函数调用,所以我想为每个 kafka 消息添加 90 秒的延迟