我们启动一个Kafka消费者,聆听可能尚未创建的主题(尽管已启用主题自动创建).
此后不久,制作人就该主题发布消息.
但是,消费者需要一些时间才能注意到这一点:确切地说是5分钟.此时,消费者撤销其分区并重新加入消费者组.卡夫卡重新稳定了这个群体.查看消费者与kafka日志的时间戳,此过程在消费者端实例化.
我想这是预期的行为,但我想理解这一点.这实际上是一个重新平衡(从0到1分区)?如果我们提前创建主题,这不会发生吗?
2017-02-01 08:36:45.692 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.692 INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2017-02-01 08:36:45.693 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group tps-kafka-partitioning
2017-02-01 08:36:45.738 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group tps-kafka-partitioning with generation 1
2017-02-01 08:36:45.747 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.749 INFO 7 --- [afka-consumer-1] …
Run Code Online (Sandbox Code Playgroud) 我一直在尝试为Spring Kafka做一些POC工作.具体来说,我想尝试在Kafka中消费消息时处理错误方面的最佳实践.
我想知道是否有人能够提供帮助:
2的代码示例如下:
鉴于AckMode设置为RECORD,根据文档:
处理记录后,侦听器返回时提交偏移量.
我认为如果监听器方法抛出异常,偏移量不会增加.但是,当我使用下面的代码/配置/命令组合测试它时,情况并非如此.偏移量仍会更新,并继续处理下一条消息.
我的配置:
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
我的代码:
@Component
public class KafkaMessageListener{
@KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))}) …
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用SpringBoot(1.5.9)和Spring-kafka(2.1.0)来设置Kafka Consumer.但是,当我启动我的应用程序时,我在Kafka MessagingMessageListenerAdapter上获得了java.lang.NoSuchMethodError:org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V.我尝试使用Spring-Kafka(1.2.0)并且错误消失了.有没有其他人遇到这个版本不兼容?
这是我的配置类
@EnableKafka
@Configuration
public class ImporterConfigs{
static Logger logger = Logger.getLogger(ImporterConfigs.class);
@Value("${kafka.bootstrap-servers}")
private static String bootstrapServers;
@Bean
public Map<String, Object> consumerKafkaConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// allows a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new …
Run Code Online (Sandbox Code Playgroud) 我正在尝试为使用Spring Boot 2.x开发的Kafka监听器编写单元测试.作为一个单元测试,我不想启动一个完整的Kafka服务器作为Zookeeper的一个实例.所以,我决定使用Spring Embedded Kafka.
我的听众的定义非常基本.
@Component
public class Listener {
private final CountDownLatch latch;
@Autowired
public Listener(CountDownLatch latch) {
this.latch = latch;
}
@KafkaListener(topics = "sample-topic")
public void listen(String message) {
latch.countDown();
}
}
Run Code Online (Sandbox Code Playgroud)
此外,latch
在接收消息后验证计数器等于零的测试非常容易.
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {
@Autowired
private KafkaEmbedded embeddedKafka;
@Autowired
private CountDownLatch latch;
private KafkaTemplate<Integer, String> producer;
@Before
public void setUp() {
this.producer = buildKafkaTemplate();
this.producer.setDefaultTopic("sample-topic");
}
private KafkaTemplate<Integer, String> …
Run Code Online (Sandbox Code Playgroud) 我有一个 Spring Boot 应用程序,它将多个主题发送到 kafka。
在我用来发送的服务中,我已经自动连接了这个。
private final KafkaTemplate<String, Object> kafkaTemplate;
Run Code Online (Sandbox Code Playgroud)
当我使用此 kafkaTemplate 发送到多个主题时,我会在日志中收到以下信息。
[Producer clientId= Producer-1] 由于关联的 topicId 从 null 更改为 J09iQeooQtOuAyEfntux_g,因此将分区 ***** 的最后一次看到的纪元重置为 87
发送。我创建了一个 GenericRecord
GenericRecord kafkaValue = new GenericData.Record(schema);
Run Code Online (Sandbox Code Playgroud)
然后我在 genericRecord 上设置值,准备好后我用这一行发送。
kafkaStringTemplate.send(new ProducerRecord<>(sendMessageConfiguration.getTopicName(), textKey, kafkaValue));
Run Code Online (Sandbox Code Playgroud)
这个警告重要吗?如果是的话我做错了什么?在我看来,所有消息都已发送。
我第一次使用Spring Kafka而且我无法在我的消费者代码中使用Acknowledgement.acknowledge()方法进行手动提交.如果我的消费者配置或监听器代码中缺少任何内容,请告诉我.或者是否有其他方法根据条件处理确认偏移.在这里,我正在寻找解决方案,如果没有手动提交/确认偏移,它应该由消费者选择相同的消息/偏移.
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
@EnableKafka
@Configuration
public class ConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${kafka.groupId}")
private String groupId;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(
props)); …
Run Code Online (Sandbox Code Playgroud) 我正在使用这个docker-compose设置在本地设置Kafka:https://github.com/wurstmeister/kafka-docker/
docker-compose up
工作正常,通过shell创建主题工作正常.
现在我尝试通过连接到Kafka spring-kafka:2.1.0.RELEASE
启动Spring应用程序时,它会打印正确版本的Kafka:
o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d
Run Code Online (Sandbox Code Playgroud)
我尝试发送这样的消息
kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");
Run Code Online (Sandbox Code Playgroud)
在客户端发送失败
UnknownServerException: The server experienced an unexpected error when processing the request
Run Code Online (Sandbox Code Playgroud)
在服务器控制台中,我收到消息Magic v1不支持记录头
Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
Run Code Online (Sandbox Code Playgroud)
谷歌搜索建议版本冲突,但版本似乎适合(org.apache.kafka:kafka-clients:1.0.0
在类路径中).
有线索吗?谢谢!
编辑:我缩小了问题的根源.发送纯字符串有效,但通过JsonSerializer发送Json会导致给定的问题.这是我的生产者配置的内容:
@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String
@Bean
fun producerConfigs(): Map<String, Any> =
HashMap<String, Any>().apply {
// list of host:port pairs used …
Run Code Online (Sandbox Code Playgroud) 我尝试在Spring Boot 应用程序中配置apache kafka。我阅读了此文档并按照以下步骤操作:
1)我将此行添加到aplication.yaml
:
spring:
kafka:
bootstrap-servers: kafka_host:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
Run Code Online (Sandbox Code Playgroud)
2)我创建新主题:
@Bean
public NewTopic responseTopic() {
return new NewTopic("new-topic", 5, (short) 1);
}
Run Code Online (Sandbox Code Playgroud)
现在我想使用KafkaTemplate
:
private final KafkaTemplate<String, byte[]> kafkaTemplate;
public KafkaEventBus(KafkaTemplate<String, byte[]> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
Run Code Online (Sandbox Code Playgroud)
但 Intellij IDE 强调:
为了解决这个问题,我需要创建 bean:
@Bean
public KafkaTemplate<String, byte[]> myMessageKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
Run Code Online (Sandbox Code Playgroud)
并传递给构造函数属性greetingProducerFactory()
:
@Bean
public ProducerFactory<String, byte[]> greetingProducerFactory() {
Map<String, Object> configProps = …
Run Code Online (Sandbox Code Playgroud) 我有一个带有自动缩放器的服务,每个实例都需要位于一个单独的消费者组中,我通过创建随机消费者组名称来实现它group-id: my-service-${random.uuid}
。我想知道如果所有消费者都消失了,一个消费者组会发生什么。我注意到在 Confluence 平台中,我有很多消费者组,但没有任何消费者。
没有任何消费者的消费群体还能存在多久?
如何配置消费者组在服务被删除后 5 分钟后被删除(我希望删除该组,而不仅仅是删除偏移量)?
spring apache-kafka spring-boot kafka-consumer-api spring-kafka
我有一个春季启动kafka应用程序。我的经纪人每隔几天就会被回收一次。旧的经纪人已取消配置,新的经纪人已配置。
我有一个调度程序,每隔几个小时要检查一次代理。我想确保一旦有了新的经纪人,我们就应该重新加载所有与Spring Kafka相关的bean。与KafkaAutoConfiguration非常相似,除了我希望触发代理值更改并以编程方式加载自动配置。
每当将旧的代理替换为新的代理时,如何以编程方式调用自动配置?