生产者属性
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
Run Code Online (Sandbox Code Playgroud)
消费属性
spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.group-id=user-group
server.port=8085
Run Code Online (Sandbox Code Playgroud)
消费者服务
@Service
public class UserConsumerService {
@KafkaListener(topics = { "user-topic" })
public void consumerUserData(User user) {
System.out.println("Users Age Is: " + user.getAge() + " Fav Genre " + user.getFavGenre());
}
}
Run Code Online (Sandbox Code Playgroud)
生产者服务
@Service
public class UserProducerService {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
public void sendUserData(User user) {
kafkaTemplate.send("user-topic", user.getName(), user);
}
}
Run Code Online (Sandbox Code Playgroud)
用于创建主题的生产者配置
@Configuration public class KafkaConfig {
@Bean
public NewTopic topicOrder() {
return TopicBuilder.name("user-topic").partitions(2).replicas(1).build();
}
}
Run Code Online (Sandbox Code Playgroud)
生产者工作得很好,但消费者给出了类似的错误
2021-12-06 …Run Code Online (Sandbox Code Playgroud) @Parameters(index = "0")
private Double min_c_re;
@Parameters(index = "1")
private Double min_c_im;
@Parameters(index = "2")
private Double max_c_re;
@Parameters(index = "3")
private Double max_c_im;
@Parameters(index = "4")
private Integer max_n;
@Parameters(index = "5")
private Integer width;
@Parameters(index = "6")
private Integer height;
@Parameters(index = "7")
private Integer divisions;
@Parameters(index = "8", arity = "1..*")
private List<URL> hosts;
@Override
public void run() {
BufferedImage image = new BufferedImage(width, height, BufferedImage.TYPE_BYTE_GRAY);
List<SubDivider.SubDivision> subDividedImages = new SubDivider(divisions, divisions).divide(image);
ExecutorService threadPool = Executors.newCachedThreadPool();
double …Run Code Online (Sandbox Code Playgroud)