标签: spring-kafka

Springboot Kafka - 消费者幂等性

我有一个监听 kafka 的 Spring-boot 应用程序。为了避免重复处理,我尝试进行手动提交。为此,我在阅读主题后立即引用了异步提交消息。但我陷入了如何实现消费者幂等性的困境,这样记录就不会被处理两次。

apache-kafka spring-boot spring-kafka

1
推荐指数
1
解决办法
2063
查看次数

Kafka 事务性读已提交消费者

我在应用程序中有事务性和普通的生产者,它们正在写入主题 kafka-topic ,如下所示。

事务性 Kafka Producer 的配置

@Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        /*The amount of time to wait before attempting to retry a failed request to a given topic partition. 
         * This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
        /*"The configuration controls the …
Run Code Online (Sandbox Code Playgroud)

transactions apache-kafka kafka-consumer-api spring-kafka kafka-transactions-api

1
推荐指数
1
解决办法
1433
查看次数

如何在我的 kafka 消费者中使用对象反序列化 json 列表?

我的 Kafka Producer 正在发送 Json 格式的对象列表。

我试图弄清楚如何让我的消费者反序列化列表。我能够接收单个对象并读取它,但是当我将代码更改为类型 List 时,我收到以下错误:

Error:(32, 47) java: incompatible types: cannot infer type arguments for org.springframework.kafka.core.DefaultKafkaConsumerFactory<>
    reason: inference variable V has incompatible equality constraints java.util.List<nl.domain.X>,nl.domain.X
Run Code Online (Sandbox Code Playgroud)

编辑

通过将 TypeReference 添加到 JsonDeserilizer 已解决此错误。

当前问题

消费消息时,它不是我定义的类型(即 List< X > ),而是返回LinkedHashMap

这是消费者配置:

@EnableKafka
@Configuration
public class KafkaConfiguration {

    @Bean
    public ConsumerFactory<String, List<X>> xConsumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
                new JsonDeserializer<>(new TypeReference<List<X>>() …
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka json-deserialization spring-kafka

1
推荐指数
1
解决办法
1万
查看次数

KafkaTemplate.send(key,value,topic) 与自定义分区器?

我看到并使用默认分区器类实现了KafkaTemplate.send(TOPIC,message)方法。

但在这里,我不传递钥匙。我有一个简单的自定义分区器类,我还想发送到KafkaTemplate(TOPIC,key,message)这样的 kafka 服务器,其中在 ProducerConfig 中我设置了用于分区的 customPartitioner 类。

我看到如果我提供自定义分区器,KafkaTemplate 的 Will send(Topic, Key, Message) 方法会调用 Partition 方法吗?但我没有完全明白。

  1. 我的简单的 customPartitioner 类:
public class CustomPartitionar implements Partitioner {
   private PartitionMapper newMapper;
   public CustomPartitionar(){
       newMapper = new PartitionMapper();
   }
   @Override
   public void configure(Map<String, ?> configs) {

   }
   @Override
   public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,Cluster cluster) {
       int partition = 0;
       String userName = (String) key;
       // Find the id …
Run Code Online (Sandbox Code Playgroud)

spring-kafka

1
推荐指数
1
解决办法
1万
查看次数

Spring Kafka Consumer Configs - 默认值和至少一次语义

我正在使用 spring-kafka 模板编写 kafka 消费者。当我实例化消费者时,Spring kafka 接受如下参数。

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
Run Code Online (Sandbox Code Playgroud)

我阅读了文档,看起来还有很多其他参数也可以作为消费者配置传递。有趣的是,每个参数都有一个默认值。我的问题是

  1. 这些是在什么基础上到达的?
  2. 是否真的需要改变这些值,如果是的话,这些值是什么
    (恕我直言,这是根据具体情况而定的。但仍然想听听专家的意见)
  3. 我们拥有的传递语义是至少一次。因此,对于这种(至少一次)传递语义,如果这些保持不变,它仍然会处理大量数据。

任何指示或答案都会对澄清我的疑问有很大帮助。

apache-kafka kafka-consumer-api spring-kafka

1
推荐指数
1
解决办法
3010
查看次数

如何从与不同代理关联的多个 Kafka 主题中进行消费?

如何从与不同代理关联的多个 Kafka 主题中进行消费?

\n

我有一个 Spring Boot 应用程序,需要使用 2 个主题,但这些主题与不同的代理相关联。

\n

我使用 Spring Kafka 和 @Listener 注释,我发现有一些方法可以使用与同一代理而不是不同代理关联的 2 个主题。不幸的是,我在 Spring Boot 或 Spring Kafka 文档中看不到任何关于如何执行此操作的有用信息。

\n

java spring apache-kafka spring-boot spring-kafka

1
推荐指数
1
解决办法
4189
查看次数

Spring Cloud Stream Kafka发送消息

如何使用新的 Spring Cloud Stream Kafka 功能模型发送消息?

已弃用的方式如下所示。

public interface OutputTopic {

    @Output("output")
    MessageChannel output();
}

@Autowired
OutputTopic outputTopic;

public someMethod() {
    
    outputTopic.output().send(MessageBuilder.build());
}
Run Code Online (Sandbox Code Playgroud)

但是我怎样才能以功能风格发送消息呢?

应用程序.yml

spring:
  cloud:
    function:
      definition: process
    stream:
      bindings:
        process-out-0:
          destination: output
          binder: kafka
Run Code Online (Sandbox Code Playgroud)
@Configuration
public class Configuration {

    @Bean
    Supplier<Message<String>> process() {
        return () -> {
            return MessageBuilder.withPayload("foo")
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
        };
    }
Run Code Online (Sandbox Code Playgroud)

我会自动装配 MessageChannel,但没有用于 process、process-out-0、输出或类似内容的 MessageChannel-Bean。或者我可以使用供应商 Bean 发送消息吗?有人可以给我举个例子吗?多谢!

functional-programming spring-boot spring-kafka spring-cloud-stream-binder-kafka

1
推荐指数
1
解决办法
2750
查看次数

当控制转到catch块时如何停止发送到kafka主题功能性kafka spring

您能否建议一下,当控件到达 catch 块时,如何停止发送到我的第三个 kafka 主题,当前消息被发送到错误主题以及正常处理时应发送到的主题。一段代码如下:

@Component
public class Abc {
private final StreamBridge streamBridge;
public Abc (StreamBridge streamBridge)
this.streamBridge = streamBridge;
@Bean
public Function<KStream<String, KafkaClass>, KStream<String,KafkaClass>> hiProcess() {
return input -> input.map((key,value) -> {
try{
KafkaClass stream = processFunction();
}
catch(Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
}
return new KeyValue<>(key, stream);
})
}
}

    
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
1926
查看次数

使用 SpringBoot 外部化 Spring Kafka 配置

我有这个卡夫卡配置类

@Configuration
public class KafkaConfiguration {
    @Value("${KAFKA_SERVERS}")
    private String kafkaServers;


    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(this.consumerFactory());
        factory.setMessageConverter(new StringJsonMessageConverter());
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(this.consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo-group-id");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ProducerConfig.RETRIES_CONFIG, …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka

1
推荐指数
1
解决办法
1083
查看次数

变更日志主题和重新分区主题 kafka 流

我想问一下,如果我不使用有状态流,我的 KafkaStreamsConfiguration 中是否需要复制因子。我不使用这个RockDB。据我所知,复制因子设置适用于更改日志和重新分区主题。我理解这个变更日志主题,但是这个重新分区主题让我有点困惑...有人可以用非常基本的语言向我解释这个重新分区主题是什么,以及如果我不在流应用程序中使用状态,我是否应该关心这个复制因子?

问候

apache-kafka apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
2149
查看次数