我在使用 Spring(boot) Kafka 的微服务中有一个消费-转换-生产工作流程。我需要实现 Kafka 事务提供的一次性语义。下面是代码片段:
配置
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(props);
defaultKafkaProducerFactory.setTransactionIdPrefix("kafka-trx-");
return defaultKafkaProducerFactory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory()); …Run Code Online (Sandbox Code Playgroud) 当批量消费 Kafka 消息时,可以使用 限制批量大小max.poll.records。
如果消费者非常快并且其提交偏移量没有明显滞后,这意味着大多数批次将会小得多。我只想接收“完整”批次,即只有在达到批次大小后才调用我的消费者函数。所以我正在寻找类似 的东西min.poll.records,它不以这种形式存在。
这是我正在做的一个最小的例子:
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.stereotype.Component
@SpringBootApplication
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
@Component
class TestConsumer {
@Bean
fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
val configs = kafkaProperties.buildConsumerProperties()
configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1000
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
factory.isBatchListener = true
return factory
}
@KafkaListener(
topics = ["myTopic"],
containerFactory = "kafkaBatchListenerContainerFactory"
) …Run Code Online (Sandbox Code Playgroud) 以下代码在receiverOptions和模板之间产生意外的循环依赖:
令人惊讶的是,如果从 spring 上下文中删除 kafkaProps,它会起作用。
看起来某些自动配置正在添加从模板到接收器选项的不必要的依赖项。
请建议配置 ReactiveKafkaConsumerTemplate 的正确方法。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Map<String, Object> kafkaProps() {
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092",
ConsumerConfig.GROUP_ID_CONFIG, DemoApplication.class.getSimpleName()
);
}
@Bean
public ReceiverOptions<String, String> receiverOptions(Map<String, Object> kafkaProps) {
return ReceiverOptions.<String, String>create(kafkaProps)
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.subscription(List.of("test-topic"));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> template(ReceiverOptions<String, String> receiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
}
@Bean
public ApplicationRunner runner(ReactiveKafkaConsumerTemplate<String, String> kafkaReceiver) {
return args -> kafkaReceiver.receive()
.log()
.subscribe();
} …Run Code Online (Sandbox Code Playgroud) 我正在spring-kafka使用 Spring Boot 1.5.16 来实现流应用程序。我们使用的版本spring-kafka是1.3.8.RELEASE。
我正在寻找一种方法来关闭启动应用程序,以防出现终止与 Kafka Streams 关联的所有线程的错误。我发现在内部KafkaStreams可以注册未捕获异常的句柄。方法是setGlobalStateRestoreListener。
我看到这个方法是spring-kafka在 type内部公开的KStreamBuilderFactoryBean。
我的问题如下。有没有一种简单的方法将 注册UncaughtExceptionHandler为 bean 并让 Spring 正确注入到工厂 bean 中?或者我应该创建KStreamBuilderFactoryBean自己的并手动设置处理程序?
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean kStreamBuilderFactoryBean(StreamsConfig streamsConfig) {
final KStreamBuilderFactoryBean streamBuilderFactoryBean = new KStreamBuilderFactoryBean(
streamsConfig);
streamBuilderFactoryBean.setUncaughtExceptionHandler((threadInError, exception) -> {
// Something happens here
});
return streamBuilderFactoryBean;
}
Run Code Online (Sandbox Code Playgroud)
多谢。
我需要将max.poll.interval.ms默认值 300000 增加到更大的值,因为。超时异常。
但是我无法在 application.properties 中找到属性(自动完成)来覆盖它。我错过了什么吗?或者我只是使用旧版本的 Spring Kafka (2.1.10)
max.poll.interval.ms = 300000
max.poll.records = 500
Run Code Online (Sandbox Code Playgroud) 我正在使用 spring-kafka 来消费来自两个 Kafka 主题的消息,该主题发送的消息格式如下所示。
@KafkaListener(topics = {"topic_country1", "topic_country2"}, groupId = KafkaUtils.MESSAGE_GROUP)
public void onCustomerMessage(String message, Acknowledgment ack) throws Exception {
log.info("Message : {} is received", message);
ack.acknowledge();
}
Run Code Online (Sandbox Code Playgroud)
autoscaling apache-kafka spring-boot kafka-consumer-api spring-kafka
我正在尝试将 Spring Kafka 与 Confluence 模式注册表和 Kafka Avro 反序列化器一起使用。使用 gradle 和 .avsc 我生成了 avro 类。使用生成的类,我发送通用记录并使用相同的记录。我在 kafka 监听器中收到以下错误:
Error while processing: ConsumerRecord(topic = topi_name, partition = 2, offset = 149, CreateTime = 1592288763784, serialized key size = 16, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = event_test, value = {"eventType": "test", "EventDataRequest": {"user": "54321", "panId": "1234", "empId": "5"}})
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void className(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, org.apache.avro.generic.GenericRecord>) throws com.test.kafka.exception.ConsumerException,org.apache.xmlbeans.XmlException,java.io.IOException,java.lang.ClassNotFoundException' threw exception; nested exception is java.lang.ClassCastException: com.test.MyPojo …Run Code Online (Sandbox Code Playgroud)spring apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
我正在编写一个使用 Kafka 获取信息的应用程序(Spring + Kotlin)。如果我在声明@KafkaListener时设置autoStartup = "true"那么应用程序可以正常工作,但前提是代理可用。当代理不可用时,应用程序在启动时崩溃。这是不受欢迎的行为。应用程序必须工作并执行其他功能。
为了避免启动时应用程序崩溃,此站点上的另一个主题建议在声明@KafkaListener时设置autoStartup = "false"。它确实有助于防止启动时崩溃。但现在我无法成功手动启动KafkaListener。在其他示例中,我看到了KafkaListenerEndpointRegistry的自动连接,但是当我尝试这样做时:
@Service
class KafkaConsumer @Autowired constructor(
private val kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry
) {
Run Code Online (Sandbox Code Playgroud)
IntelliJ Idea 警告:
无法自动装配。未找到“KafkaListenerEndpointRegistry”类型的 bean。
当我尝试在不自动装配的情况下使用 KafkaListenerEndpointRegistry 并执行以下代码时:
@Service
class KafkaConsumer {
private val logger = LoggerFactory.getLogger(this::class.java)
private val kafkaListenerEndpointRegistry = KafkaListenerEndpointRegistry()
@Scheduled(fixedDelay = 10000)
fun startCpguListener(){
val container = kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
if (!container.isRunning)
try {
logger.info("Kafka Consumer is not running. Trying to start...")
container.start()
} catch (e: Exception){ …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 @KafkaListener 实现消费者。我使用的是Spring2.3.7版本。
这是到目前为止我的代码,
public class SampleListener {
@KafkaListener(topics = "test-topic",
containerFactory = "sampleKafkaListenerContainerFactory",
groupId = "test-group")
public void onMessage(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long receivedTimestamp,
@Header(KafkaHeaders.OFFSET) long offset,
@Headers MessageHeaders messageHeaders) {
LOGGER.info("Received Message for topic={} partition={} offset={} messageHeaders={}",
topic, partition, offset, messageHeaders);
LOGGER.debug("Received Message payload={}", message);
doSomething(message);
}
}
Run Code Online (Sandbox Code Playgroud)
我是卡夫卡和春天的新手。我阅读了有关如何寻求偏移量的 spring-kafka 文档,但无法完全理解。
据我了解,对于我的用例,当分区分配给容器或在任何其他场景中时,我不想再次读取事件(确保只读取一次)。
我看到大多数 Consumer 实现都实现了ConsumerSeekAware。我知道实施ConsumerSeekAware使我们能够寻求对诸如onIdleContainer或 之类的事件进行抵消onPartitionsAssigned。我无法理解这些正在处理什么场景?
ConsumerSeekAware实施来处理哪些场景?实现 Kafka Consumer 需要寻求抵消的最佳实践或一般场景是什么?
registerSeekCallback …
我遵循了 baeldung.com 的“ Apache Kafka with Spring 简介”教程。我用以下方法设置了一个KafkaConsumerConfig类kafkaConsumerFactory:
private ConsumerFactory<String, String> kafkaConsumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
...
return new DefaultKafkaConsumerFactory<>(props);
}
Run Code Online (Sandbox Code Playgroud)
和两个“定制”工厂:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("bar");
}
Run Code Online (Sandbox Code Playgroud)
在MessageListener课堂上,我使用@KafkaListener注释来注册消费者,以groupId监听某个主题:
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group 'foo': " + …Run Code Online (Sandbox Code Playgroud) spring-kafka ×10
apache-kafka ×6
spring ×4
java ×3
spring-boot ×3
kotlin ×2
autoscaling ×1
spring-batch ×1