我正在使用kafka_2.9.2-0.8.1.1和zookeeper 3.4.6.
是否有可以从zookeeper中自动删除使用者组的实用程序?或者我可以删除zookeeper中/ consumers/[group_id]下的所有内容吗?如果是后者,还有什么我错过了吗?这可以用现场系统完成吗?
我是Kafka的新手,我不太了解Kafka配置的含义,任何人都可以解释为什么更容易理解!
这是我的代码:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "master:9092,slave1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "GROUP_2017",
"auto.offset.reset" -> "latest", //earliest or latest
"enable.auto.commit" -> (true: java.lang.Boolean)
)
Run Code Online (Sandbox Code Playgroud)
这在我的代码中意味着什么?
我有以下代码
class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {
fun run() {
consumer.seekToEnd(emptyList())
val pollDuration = 30 // seconds
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
// perform record analysis and commitSync()
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
消费者订阅的主题持续接收记录。有时,消费者会因处理步骤而崩溃。当消费者重新启动时,我希望它从主题的最新偏移量开始消费(即忽略消费者关闭时发布到主题的记录)。我认为该seekToEnd()方法可以确保这一点。然而,这个方法似乎根本没有效果。消费者从崩溃的偏移量开始消费。
正确的使用方法是什么seekToEnd()?
编辑:使用以下配置创建消费者
fun <T> buildConsumer(valueDeserializer: String): KafkaConsumer<String, T> {
val props = setupConfig(valueDeserializer)
Common.setupConsumerSecurityProtocol(props)
return createConsumer(props)
}
fun setupConfig(valueDeserializer: String): Properties {
// Configuration setup
val props = Properties()
props[ConsumerConfig.GROUP_ID_CONFIG] = config.applicationId
props[ConsumerConfig.CLIENT_ID_CONFIG] = config.kafka.clientId
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafka.bootstrapServers
props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = config.kafka.schemaRegistryUrl …Run Code Online (Sandbox Code Playgroud) 我目前正在使用Spring Integration Kafka进行实时统计.但是,组名使Kafka搜索了侦听器未读取的所有先前值.
@Value("${kafka.consumer.group.id}")
private String consumerGroupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}
public Map<String, Object> getDefaultProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return properties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaMessageListener listener() {
return new KafkaMessageListener();
}
Run Code Online (Sandbox Code Playgroud)
我想开始最新的偏移,而不是被旧的价值所困扰.是否有可能重置组的偏移量?
我已经用KafkaHandler. 我的消费者应该消费事件,然后针对每个事件向其他服务发送 REST 请求。我只想在该 REST 服务关闭时重试。否则,我可以忽略失败的事件。
我的容器工厂配置如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate());
factory.setConcurrency(3);
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
我ExceptionClassifierRetryPolicy用于设置异常和相应的重试策略。
重试后一切看起来都很好。当我得到一个时它会重试,当我得到一个时ConnectException它会忽略IllegalArgumentException。
然而,在IllegalArgumentException场景中,SeekToCurrentErrorHandler返回到未处理的偏移量(因为它寻找未处理的消息,包括失败的消息),最终立即重试失败的消息。消费者不断地来回并重试百万次。
如果我有机会了解哪个记录失败了SeekToCurrentErrorHandler,那么我将创建一个自定义实现SeekToCurrentErrorHandler来检查失败的消息是否可重试(通过使用该thrownException字段)。如果它不可重试,那么我会将它从列表中删除records以寻找回来。
关于如何实现此功能的任何想法?
注:enable.auto.commit设为false,auto.offset.reset设为earliest。
谢谢!
我试图了解 ConsumerConfig.auto.offset.reset = latest 如何影响消息消耗。
例如,我有一个消费者,最初在 t1 时间发送 100 条消息,然后我的消费者在 t1+30 秒启动并运行,然后我的消费者会消费 t1+30 秒后发布的消息还是会消费 t1 之后发布的消息?
我希望有一个Kafka Consumer,它从一个主题中的最新消息开始.
这是java代码:
private static Properties properties = new Properties();
private static KafkaConsumer<String, String> consumer;
static
{
properties.setProperty("bootstrap.servers","localhost");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "test");
properties.setProperty("auto.offset.reset", "latest");
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("mytopic"));
}
@Override
public StreamHandler call() throws Exception
{
while (true)
{
ConsumerRecords<String, String> consumerRecords = consumer.poll(200);
Iterable<ConsumerRecord<String, String>> records = consumerRecords.records("mytopic");
for(ConsumerRecord<String, String> rec : records)
{
System.out.println(rec.value());
}
}
}
Run Code Online (Sandbox Code Playgroud)
尽管auto.offset.reset的值是最新的,但是消费者会在2天前启动表单消息,然后赶上最新的消息.
我错过了什么?
我已经使用 Kafka 几个月了,我意识到一些核心概念对我来说还不是很清楚。我的疑问与consumerId、groupId 和offsets 之间的关系有关。在我们的应用程序中,我们需要 Kafka 使用发布 - 订阅范式工作,因此我们为每个消费者使用不同的组 ID,这些 ID 是随机生成的。
我曾经认为设置auto.offset.reset = latest我的消费者总是会收到他们尚未收到的消息,但最近我了解到事实并非如此。这仅在消费者尚未提交偏移量时才有效。在任何其他情况下,消费者将继续接收偏移量大于其提交的最后一个偏移量的消息。
由于我总是使用随机组 ID 创建新消费者,我意识到我的消费者“没有记忆”,他们是新消费者,他们永远不会提交偏移量,因此该auto.offset.reset = latest政策将始终适用。这就是我怀疑的地方。假设以下场景:
my-topic。auto.offset.reset设置适用latest于两个消费者。my-topic。groupId是随机的,而且我没有设置任何消费者 ID,所以这意味着这是一个新的消费者(对吧?)。应用程序 B 没有收到任何消息。