标签: spring-kafka

休息控制器通过spring kafka返回kafka中的记录

对于我的演示应用程序,我必须创建一个 rest 控制器来返回 kafka 队列中的消息。我已经阅读了 spring-kafka 参考指南并实现了消费者配置并创建了如下的 bean

@Configuration
@EnableKafka
public class ConsumerConfiguration {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        // allows a pool of processes to divide the work of consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "trx");

        return props;
    }

    @Bean
    public ConsumerFactory<String, Transaction> transactionConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>( …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-boot kafka-consumer-api spring-web spring-kafka

2
推荐指数
1
解决办法
1234
查看次数

在实现 ConsumerSeekAware 接口时,如何使“两种方法具有相同的擦除但都不覆盖另一个”警告静音?

我正在尝试在 Spring 中为 Kafka 消费者实现 ConsumerSeekAware 接口,但是当我尝试覆盖void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback)void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback)在我的应用程序代码中时,我收到“两种方法具有相同的擦除但都不覆盖另一个”警告。

public static class MessageListener implements ConsumerSeekAware{
    ConsumerSeekCallback callback;

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback){
      this.callback = callback;
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {

    }

    @Override
    void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {

    }
}
Run Code Online (Sandbox Code Playgroud)

我确实知道此警告与需要覆盖的两种方法的 Map 参数中的泛型有关,但我不确定究竟是什么导致了此警告?

在实现 ConsumerSeekAware 接口时,如何使“两种方法具有相同的擦除但都不覆盖另一个”警告静音?

java spring-kafka

2
推荐指数
1
解决办法
293
查看次数

Spring-Kafka 消费者没有收到消息

我不知道发生了什么事,我用 @KafkaListener 注释的 java 客户端消费者没有收到任何消息。当我通过命令行创建消费者时,它可以工作。Producer 也按预期工作(也在 java 中)。有人可以帮助我理解这种行为吗?

应用程序.yml

kafka:
  bootstrap-servers: localhost:9092
  topic: my-topic
Run Code Online (Sandbox Code Playgroud)

生产者配置:

@Configuration
public class KafkaProducerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory(){
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}
Run Code Online (Sandbox Code Playgroud)

消费者配置:

@EnableKafka
@Configuration
class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    String bootstrapServers;

    @Bean
    public ConsumerFactory<String, String> consumerFactory(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-kafka

2
推荐指数
1
解决办法
9044
查看次数

如何使用spring boot设置kafka使用者并发性

我正在编写一个基于Java的Kafka Consumer应用程序.我正在为我的应用程序使用kafka-clients,Spring Kafka和Spring启动.虽然Spring启动让我轻松编写Kafka使用者(没有真正编写ConcurrentKafkaListenerContainerFactory,ConsumerFactory等),但我希望能够为这些使用者定义/定制一些属性.但是,我找不到使用Spring启动的简单方法.例如:我有兴趣设置的一些属性是 -

ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG

我接过一看春天开机前定义的属性在这里.

此外,基于前面一个问题在这里,我想设置对消费者的并发性,但无法找到一个配置,为application.properties驱动的方式做到这一点使用Spring启动.

一种显而易见的方法是ConcurrentKafkaListenerContainerFactory, ConsumerFactory在我的Spring Context中再次定义类并从那里开始工作.我想了解是否有更简洁的方法,特别是因为我使用的是Spring Boot.

版本 -

  • kafka-clients - 0.10.0.0-SASL
  • spring-kafka - 1.1.0.RELEASE
  • 春季靴子 - 1.5.10.RELEASE

java apache-kafka spring-boot kafka-consumer-api spring-kafka

2
推荐指数
1
解决办法
6728
查看次数

Spring Kafka 和主题消费者的数量

在我的 Spring Boot/Kafka 项目中,我有以下消费者配置:

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(kafkaProperties));
        factory.setConcurrency(10);    
        return factory;
    }

    @Bean
    public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(postConsumerFactory(kafkaProperties));

        return factory;
    }

}
Run Code Online (Sandbox Code Playgroud)

这是我的PostConsumer

@Component
public …
Run Code Online (Sandbox Code Playgroud)

messaging apache-kafka spring-boot spring-kafka

2
推荐指数
1
解决办法
7996
查看次数

春天卡夫卡和卡夫卡流

在Spring Boot应用程序中,我正在尝试配置Kafka Streams.使用简单的Kafka主题,一切正常,但我无法使用Spring Kafka Streams.

这是我的配置:

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new StreamsConfig(props);
    }

    @Bean
    public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {

        KStream<String, String> stream = kStreamBuilder.stream("post.sent");

        stream.mapValues(post -> post.toString()).to("streamingTopic2");

        stream.print();

        return stream;
    }

    @Bean
    public NewTopic kafkaTopicTest() {
        return new NewTopic("streamingTopic2", 1, (short) 1);
    }

    @KafkaListener(topics = "streamingTopic2", containerFactory = "kafkaListenerContainerFactory") …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot apache-kafka-streams spring-kafka

2
推荐指数
1
解决办法
1801
查看次数

如何在Spring Kafka Consumer中跳过损坏的(不可序列化的)消息?

这个问题适用于Spring Kafka,与Apache Kafka和High Level Consumer相关:跳过损坏的消息

有没有办法配置Spring Kafka使用者跳过无法读取/处理(损坏)的记录?

我看到如果无法反序列化,消费者会陷入同一记录的情况.这是消费者抛出的错误.

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value 
Run Code Online (Sandbox Code Playgroud)

消费者轮询主题并在循环中继续打印相同的错误,直到程序被杀死.

在具有以下使用者工厂配置的@KafkaListener中,

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-kafka

2
推荐指数
1
解决办法
2548
查看次数

KStream 将记录发送到多个流(不是分支)

有没有办法进行类似分支的操作​​,但将记录放在每个谓词评估为真的输出流中?Brach 将记录放置到第一个匹配项(文档:在第一个匹配项中将记录放置到一个且仅一个输出流中)。

apache-kafka apache-kafka-streams spring-kafka

2
推荐指数
1
解决办法
1242
查看次数

确定 Kafka-Client 与 kafka-broker 的兼容性

在远程 kafka 云集群中,kafka 代理将更新到新版本 (5.1),因此适用新的 kafka 协议。

现在我应该更新我的 kafka 客户端以能够连接。现在我在 spring-boot 应用程序中使用以下 kafka 相关依赖项:

<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-parent</artifactId>
<version>1.3.8.RELEASE</version>

<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.1.RELEASE</version>

<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>1.0.3.RELEASE</version>

<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.3.RELEASE</version>

<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>

<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
Run Code Online (Sandbox Code Playgroud)

我已经为 kafka-clients 和 kafka-brokers 搜索了合适的兼容性矩阵。

  • 有谁知道如何成功升级?
  • 有没有人有这个依赖项的兼容性矩阵?

java apache-kafka spring-kafka

2
推荐指数
2
解决办法
8237
查看次数

创建 Kafka Producer 的演示应用程序为 java.lang.InstantiationException: null 抛出“无法构建 kafka 生产者”错误

我正在编写一个演示应用程序来创建一个 Kafka Producer。我创建了一个主题并在 Kafka 上运行了一个生产者和消费者,它似乎正在工作。我正在编写一个 spring 应用程序来创建一个生产者。我将名称作为调用的一部分传递。当我转到“ http://localhost:8080/kafka/publish/Peter ”时,出现白标错误“无法构建 kafka 生产者”。请帮忙。

主要应用是:SpringBootKafkaProducerApplication.java

package com.ranjana.demo.kafka.springbootkafkaproducer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootKafkaProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaProducerApplication.class, args);
    }
}
Run Code Online (Sandbox Code Playgroud)

模型是:User.java

package com.ranjana.demo.kafka.springbootkafkaproducer.model;

public class User {
    private String name;
    private String Department;
    private Long Salary;

    public User(String name, String department, Long salary) {
        this.name = name;
        Department = department;
        Salary = salary;
    }

    //Getters and setters

}

Run Code Online (Sandbox Code Playgroud)

控制器是 UserResourse.java

package com.ranjana.demo.kafka.springbootkafkaproducer.resource;

import com.ranjana.demo.kafka.springbootkafkaproducer.model.User;
import org.springframework.beans.factory.annotation.Autowired;
import …
Run Code Online (Sandbox Code Playgroud)

spring kafka-producer-api spring-kafka

2
推荐指数
1
解决办法
4006
查看次数