我有一个监听 kafka 的 Spring-boot 应用程序。为了避免重复处理,我尝试进行手动提交。为此,我在阅读主题后立即引用了异步提交消息。但我陷入了如何实现消费者幂等性的困境,这样记录就不会被处理两次。
我在应用程序中有事务性和普通的生产者,它们正在写入主题 kafka-topic ,如下所示。
事务性 Kafka Producer 的配置
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
/*The amount of time to wait before attempting to retry a failed request to a given topic partition.
* This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
/*"The configuration controls the …Run Code Online (Sandbox Code Playgroud) transactions apache-kafka kafka-consumer-api spring-kafka kafka-transactions-api
我的 Kafka Producer 正在发送 Json 格式的对象列表。
我试图弄清楚如何让我的消费者反序列化列表。我能够接收单个对象并读取它,但是当我将代码更改为类型 List 时,我收到以下错误:
Error:(32, 47) java: incompatible types: cannot infer type arguments for org.springframework.kafka.core.DefaultKafkaConsumerFactory<>
reason: inference variable V has incompatible equality constraints java.util.List<nl.domain.X>,nl.domain.X
Run Code Online (Sandbox Code Playgroud)
编辑
通过将 TypeReference 添加到 JsonDeserilizer 已解决此错误。
当前问题:
消费消息时,它不是我定义的类型(即 List< X > ),而是返回LinkedHashMap
这是消费者配置:
@EnableKafka
@Configuration
public class KafkaConfiguration {
@Bean
public ConsumerFactory<String, List<X>> xConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(new TypeReference<List<X>>() …Run Code Online (Sandbox Code Playgroud) 我看到并使用默认分区器类实现了KafkaTemplate.send(TOPIC,message)方法。
但在这里,我不传递钥匙。我有一个简单的自定义分区器类,我还想发送到KafkaTemplate(TOPIC,key,message)这样的 kafka 服务器,其中在 ProducerConfig 中我设置了用于分区的 customPartitioner 类。
我看到如果我提供自定义分区器,KafkaTemplate 的 Will send(Topic, Key, Message) 方法会调用 Partition 方法吗?但我没有完全明白。
public class CustomPartitionar implements Partitioner {
private PartitionMapper newMapper;
public CustomPartitionar(){
newMapper = new PartitionMapper();
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,Cluster cluster) {
int partition = 0;
String userName = (String) key;
// Find the id …Run Code Online (Sandbox Code Playgroud) 我正在使用 spring-kafka 模板编写 kafka 消费者。当我实例化消费者时,Spring kafka 接受如下参数。
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
Run Code Online (Sandbox Code Playgroud)
我阅读了文档,看起来还有很多其他参数也可以作为消费者配置传递。有趣的是,每个参数都有一个默认值。我的问题是
任何指示或答案都会对澄清我的疑问有很大帮助。
如何从与不同代理关联的多个 Kafka 主题中进行消费?
\n我有一个 Spring Boot 应用程序,需要使用 2 个主题,但这些主题与不同的代理相关联。
\n我使用 Spring Kafka 和 @Listener 注释,我发现有一些方法可以使用与同一代理而不是不同代理关联的 2 个主题。不幸的是,我在 Spring Boot 或 Spring Kafka 文档中看不到任何关于如何执行此操作的有用信息。
\n如何使用新的 Spring Cloud Stream Kafka 功能模型发送消息?
已弃用的方式如下所示。
public interface OutputTopic {
@Output("output")
MessageChannel output();
}
@Autowired
OutputTopic outputTopic;
public someMethod() {
outputTopic.output().send(MessageBuilder.build());
}
Run Code Online (Sandbox Code Playgroud)
但是我怎样才能以功能风格发送消息呢?
应用程序.yml
spring:
cloud:
function:
definition: process
stream:
bindings:
process-out-0:
destination: output
binder: kafka
Run Code Online (Sandbox Code Playgroud)
@Configuration
public class Configuration {
@Bean
Supplier<Message<String>> process() {
return () -> {
return MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
};
}
Run Code Online (Sandbox Code Playgroud)
我会自动装配 MessageChannel,但没有用于 process、process-out-0、输出或类似内容的 MessageChannel-Bean。或者我可以使用供应商 Bean 发送消息吗?有人可以给我举个例子吗?多谢!
functional-programming spring-boot spring-kafka spring-cloud-stream-binder-kafka
您能否建议一下,当控件到达 catch 块时,如何停止发送到我的第三个 kafka 主题,当前消息被发送到错误主题以及正常处理时应发送到的主题。一段代码如下:
@Component
public class Abc {
private final StreamBridge streamBridge;
public Abc (StreamBridge streamBridge)
this.streamBridge = streamBridge;
@Bean
public Function<KStream<String, KafkaClass>, KStream<String,KafkaClass>> hiProcess() {
return input -> input.map((key,value) -> {
try{
KafkaClass stream = processFunction();
}
catch(Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
}
return new KeyValue<>(key, stream);
})
}
}
Run Code Online (Sandbox Code Playgroud) apache-kafka spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka
我有这个卡夫卡配置类
@Configuration
public class KafkaConfiguration {
@Value("${KAFKA_SERVERS}")
private String kafkaServers;
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(this.consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo-group-id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ProducerConfig.RETRIES_CONFIG, …Run Code Online (Sandbox Code Playgroud) 我想问一下,如果我不使用有状态流,我的 KafkaStreamsConfiguration 中是否需要复制因子。我不使用这个RockDB。据我所知,复制因子设置适用于更改日志和重新分区主题。我理解这个变更日志主题,但是这个重新分区主题让我有点困惑...有人可以用非常基本的语言向我解释这个重新分区主题是什么,以及如果我不在流应用程序中使用状态,我是否应该关心这个复制因子?
问候
spring-kafka ×10
apache-kafka ×8
spring-boot ×4
java ×3
spring ×2
spring-cloud ×1
spring-cloud-stream-binder-kafka ×1
transactions ×1