对于我的演示应用程序,我必须创建一个 rest 控制器来返回 kafka 队列中的消息。我已经阅读了 spring-kafka 参考指南并实现了消费者配置并创建了如下的 bean
@Configuration
@EnableKafka
public class ConsumerConfiguration {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// allows a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "trx");
return props;
}
@Bean
public ConsumerFactory<String, Transaction> transactionConsumerFactory() {
return new DefaultKafkaConsumerFactory<>( …Run Code Online (Sandbox Code Playgroud) apache-kafka spring-boot kafka-consumer-api spring-web spring-kafka
我正在尝试在 Spring 中为 Kafka 消费者实现 ConsumerSeekAware 接口,但是当我尝试覆盖void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback)和void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback)在我的应用程序代码中时,我收到“两种方法具有相同的擦除但都不覆盖另一个”警告。
public static class MessageListener implements ConsumerSeekAware{
ConsumerSeekCallback callback;
@Override
public void registerSeekCallback(ConsumerSeekCallback callback){
this.callback = callback;
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@Override
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
}
Run Code Online (Sandbox Code Playgroud)
我确实知道此警告与需要覆盖的两种方法的 Map 参数中的泛型有关,但我不确定究竟是什么导致了此警告?
在实现 ConsumerSeekAware 接口时,如何使“两种方法具有相同的擦除但都不覆盖另一个”警告静音?
我不知道发生了什么事,我用 @KafkaListener 注释的 java 客户端消费者没有收到任何消息。当我通过命令行创建消费者时,它可以工作。Producer 也按预期工作(也在 java 中)。有人可以帮助我理解这种行为吗?
应用程序.yml
kafka:
bootstrap-servers: localhost:9092
topic: my-topic
Run Code Online (Sandbox Code Playgroud)
生产者配置:
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
Run Code Online (Sandbox Code Playgroud)
消费者配置:
@EnableKafka
@Configuration
class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
String bootstrapServers;
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, …Run Code Online (Sandbox Code Playgroud) 我正在编写一个基于Java的Kafka Consumer应用程序.我正在为我的应用程序使用kafka-clients,Spring Kafka和Spring启动.虽然Spring启动让我轻松编写Kafka使用者(没有真正编写ConcurrentKafkaListenerContainerFactory,ConsumerFactory等),但我希望能够为这些使用者定义/定制一些属性.但是,我找不到使用Spring启动的简单方法.例如:我有兴趣设置的一些属性是 -
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
我接过一看春天开机前定义的属性在这里.
此外,基于前面一个问题在这里,我想设置对消费者的并发性,但无法找到一个配置,为application.properties驱动的方式做到这一点使用Spring启动.
一种显而易见的方法是ConcurrentKafkaListenerContainerFactory, ConsumerFactory在我的Spring Context中再次定义类并从那里开始工作.我想了解是否有更简洁的方法,特别是因为我使用的是Spring Boot.
版本 -
java apache-kafka spring-boot kafka-consumer-api spring-kafka
在我的 Spring Boot/Kafka 项目中,我有以下消费者配置:
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(kafkaProperties));
factory.setConcurrency(10);
return factory;
}
@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
return factory;
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的PostConsumer:
@Component
public …Run Code Online (Sandbox Code Playgroud) 在Spring Boot应用程序中,我正在尝试配置Kafka Streams.使用简单的Kafka主题,一切正常,但我无法使用Spring Kafka Streams.
这是我的配置:
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new StreamsConfig(props);
}
@Bean
public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<String, String> stream = kStreamBuilder.stream("post.sent");
stream.mapValues(post -> post.toString()).to("streamingTopic2");
stream.print();
return stream;
}
@Bean
public NewTopic kafkaTopicTest() {
return new NewTopic("streamingTopic2", 1, (short) 1);
}
@KafkaListener(topics = "streamingTopic2", containerFactory = "kafkaListenerContainerFactory") …Run Code Online (Sandbox Code Playgroud) java apache-kafka spring-boot apache-kafka-streams spring-kafka
这个问题适用于Spring Kafka,与Apache Kafka和High Level Consumer相关:跳过损坏的消息
有没有办法配置Spring Kafka使用者跳过无法读取/处理(损坏)的记录?
我看到如果无法反序列化,消费者会陷入同一记录的情况.这是消费者抛出的错误.
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value
Run Code Online (Sandbox Code Playgroud)
消费者轮询主题并在循环中继续打印相同的错误,直到程序被杀死.
在具有以下使用者工厂配置的@KafkaListener中,
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
Run Code Online (Sandbox Code Playgroud) 有没有办法进行类似分支的操作,但将记录放在每个谓词评估为真的输出流中?Brach 将记录放置到第一个匹配项(文档:在第一个匹配项中将记录放置到一个且仅一个输出流中)。
在远程 kafka 云集群中,kafka 代理将更新到新版本 (5.1),因此适用新的 kafka 协议。
现在我应该更新我的 kafka 客户端以能够连接。现在我在 spring-boot 应用程序中使用以下 kafka 相关依赖项:
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-parent</artifactId>
<version>1.3.8.RELEASE</version>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.1.RELEASE</version>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>1.0.3.RELEASE</version>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.3.RELEASE</version>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
Run Code Online (Sandbox Code Playgroud)
我已经为 kafka-clients 和 kafka-brokers 搜索了合适的兼容性矩阵。
我正在编写一个演示应用程序来创建一个 Kafka Producer。我创建了一个主题并在 Kafka 上运行了一个生产者和消费者,它似乎正在工作。我正在编写一个 spring 应用程序来创建一个生产者。我将名称作为调用的一部分传递。当我转到“ http://localhost:8080/kafka/publish/Peter ”时,出现白标错误“无法构建 kafka 生产者”。请帮忙。
主要应用是:SpringBootKafkaProducerApplication.java
package com.ranjana.demo.kafka.springbootkafkaproducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootKafkaProducerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaProducerApplication.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
模型是:User.java
package com.ranjana.demo.kafka.springbootkafkaproducer.model;
public class User {
private String name;
private String Department;
private Long Salary;
public User(String name, String department, Long salary) {
this.name = name;
Department = department;
Salary = salary;
}
//Getters and setters
}
Run Code Online (Sandbox Code Playgroud)
控制器是 UserResourse.java
package com.ranjana.demo.kafka.springbootkafkaproducer.resource;
import com.ranjana.demo.kafka.springbootkafkaproducer.model.User;
import org.springframework.beans.factory.annotation.Autowired;
import …Run Code Online (Sandbox Code Playgroud)