我有使用 Apache Kafka 2.11-0.10.1.0 的 Java 8 应用程序。我需要使用该seek功能来处理poll来自分区的旧消息。但是,我No current assignment for partition每次尝试时都会遇到异常seekByOffset。这是我的班级,负责将seek主题添加到指定的时间戳:
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* The main purpose of this class is to move fetching point for each partition of the {@link KafkaConsumer}
* to some offset which is …Run Code Online (Sandbox Code Playgroud) 我在1.0.0-cp1版本的Kafka集群的工作台上工作.
在我的工作室的一部分,他们专注于订购保证和没有数据丢失的最大吞吐量(只有一个分区的主题),需要我将max.in.flight.requests.per.connection属性设置为1?
我已经读过这篇文章
了解我只需要将max.in.flight设置为1,如果我使用该retries属性启用我的制作人的重试功能.
要问我的问题的另一种方法:只有一个分区+重试= 0(生产者道具),足以保证在卡夫卡的顺序?
我需要知道,因为增加max.in.flight会大大增加吞吐量.
我想创建使用安全协议 SASL_SSL 和 sasl Merchanism PLAIN 的 kafka 消费者。有人可以帮我配置这些详细信息吗?
我已经阅读了许多有关如何配置 SASL 详细信息的文档,但仍然没有清楚地了解如何做到这一点。这里我附上我用来创建kafka消费者的代码
Properties props = new Properties();
props.put("bootstrap.servers", "servers");
String consumeGroup = "consumer_group";
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\"");
props.put("group.id", consumeGroup);
props.put("client.id", "client_id");
props.put("security.protocol", "SASL_SSL");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "101");
props.put("max.partition.fetch.bytes", "135");
// props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
Run Code Online (Sandbox Code Playgroud)
堆栈跟踪
14:56:12.767 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Starting the Kafka consumer
14:56:12.776 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(-2, kafka-events-nonprod-ds1.i, …Run Code Online (Sandbox Code Playgroud) 我正在使用 MirrorMaker2 进行灾难恢复。
Kafka 2.7 应支持 自动消费者偏移同步
这是我正在使用的 yaml 文件(我使用 strimzi 来创建它)
所有源集群主题都会复制到目标集群中。另外...checkpoint.internal主题是在目标集群中创建的,其中包含同步的所有源集群偏移量,但我没有看到这些偏移量被转换为目标集群_consumer_offsets主题,这意味着我何时将在目标中启动消费者(同一消费者组) cluster 它将从头开始读取消息。
我的期望是,在允许自动消费者偏移量后,同步来自翻译并存储在目标集群中的 _consumer_offsets 主题中的源集群的所有消费者偏移量。
有人可以澄清我的期望是否正确,如果不正确,它应该如何运作。
我已经安装kafka_2.11-1.1.0并将广告监听器设置为advertised.listeners=PLAINTEXT://<my-ip>:9092(in $KAFKA_HOME/config/server.properties)。
我可以使用 java 代码连接并写入我的 kafka,并通过kafka-tool另一台服务器查看我的集群,但我无法从本地机器(我在其上安装了 kafka 集群的机器)向我的主题写入消息。
我也尝试将 listeners 值设置为,listeners = PLAINTEXT://:9092但没有任何变化。我应该如何处理我的 kafka 以使其从本地主机的外部和内部都可以访问和写入?
我使用 node-rdkafka 通过以下选项连接到 IBM MessageHub:
var options = {
// 'debug': 'all',
'metadata.broker.list': brokers,
'security.protocol': 'sasl_ssl',
'ssl.ca.location': '/etc/ssl/certs',
'sasl.mechanisms': 'PLAIN',
'sasl.username': username,
'sasl.password': password,
'api.version.request': true,
'broker.version.fallback': '0.10.2.1',
'log.connection.close': false,
'dr_msg_cb': true
}
Run Code Online (Sandbox Code Playgroud)
客户端在 IBM Kubernetes Service pod (Ubuntu) 上运行。
请指教。谢谢。
我的问题是:如果我将配额设置为 1 MB/秒,多代理集群会得到多少?(例如,采用 2 个代理)每个代理的速度是 1MB/秒(还是)每个代理 0.5 MB/秒?
文档说:
例如,如果 (user="test-user", client-id="test-client") 的生产配额为 10MB/秒,则该配额将在用户“test-user”的所有生产者实例与客户端之间共享id“测试客户端。
我的生产者用键 0,1,2,3,4,5,6,7,8,9 写了 10 条消息
我的消费者(使用自动提交 = false)在第一次循环期间读取消息但是当读取消息“2”时我强制异常所以我不提交。(异常仅在第一个循环中抛出 - 有标志 forceError)
在第二个循环中,消费者应该读取所有消息 0,1,2,3,4,5,6,7,8,9 并提交,但事实并非如此,为什么?
日志是这样的:
write message with key: 0
write message with key: 1
write message with key: 2
write message with key: 3
write message with key: 4
write message with key: 5
write message with key: 6
write message with key: 7
write message with key: 8
write message with key: 9
1 loop, consumerRecords.count: 10
read message with key: 0
read message with key: 1
read …Run Code Online (Sandbox Code Playgroud) 我需要根据某些数据库驱动的属性打开/关闭 Kafka 消费者。怎样才能实现呢。
我想到的一种方法是:当消费者标志关闭时,从消费者抛出异常。容器工厂配置定义为
factory.setErrorHandler(new SeekToCurrentErrorHandler());
Run Code Online (Sandbox Code Playgroud)
但它积极寻求同样的信息。
有什么办法可以根据需要关闭心跳然后再打开。
重新启动代理后,我发现一个主题的领导者中的所有分区都在代理3中,并且我设置了
imbalance.check.interval.seconds300,auto.leader.rebalance.enableTrue,但300秒后没有任何反应。
所以我使用了bin/kafka-preferred-replica-election.sh,并得到了我所期望的结果,领导者被平衡为经纪人 1、2、3。
我想知道为什么自动重新平衡没有发生?kafka-preferred-replica-election.sh和 和有什么区别auto.leader.rebalance.enable?
控制器日志:
[2019-08-14 09:31:33,454] INFO [Controller id=3] 处理自动首选副本领导者选举(kafka.controller.KafkaController)[2019-08-14 09:31:33,454] TRACE [Controller id=3 ] 检查是否需要触发自动领导者平衡(kafka.controller.KafkaController)[2019-08-14 09:31:33,455] DEBUG [Controller id=3] Broker 首选副本 Map(2 -> Map(__consumer_offsets-22 ->矢量(2),__consumer_offsets-4->矢量(2),__consumer_offsets-7->矢量(2),__consumer_offsets-46->矢量(2),__consumer_offsets-25->矢量(2),__consumer_offsets-49->矢量(2),__consumer_offsets-16->矢量(2),__consumer_offsets-28->矢量(2),__consumer_offsets-31->矢量(2),__consumer_offsets-37->矢量(2),__consumer_offsets-19->矢量(2),__consumer_offsets-13 - >矢量(2),fourth_topic-1 - >矢量(2,3,1),__consumer_offsets-43 - >矢量(2),__consumer_offsets-1 - >矢量(2),__consumer_offsets -34 -> 矢量(2), __consumer_offsets-10 -> 矢量(2), __consumer_offsets-40 -> 矢量(2)), 1 -> 地图(__consumer_offsets-30 -> 矢量(1), __consumer_offsets-21 ->矢量(1),__consumer_offsets-27->矢量(1),__consumer_offsets-9->矢量(1),__consumer_offsets-33->矢量(1),__consumer_offsets-36->矢量(1),__consumer_offsets-42->矢量(1),__consumer_offsets-3 - >矢量(1),__consumer_offsets-18 - >矢量(1),__consumer_offsets-15 - >矢量(1),__consumer_offsets-24 - >矢量(1),__consumer_offsets-48 - >矢量(1),__consumer_offsets-6 - >矢量(1),fourth_topic-0 - >矢量(1,2,3),__consumer_offsets-0 - >矢量(1),__consumer_offsets-39 - >矢量(1),__consumer_offsets …
apache-kafka ×10
java ×2
jaas ×1
message-hub ×1
producer ×1
sasl ×1
spring-kafka ×1
strimzi ×1