我有 Kafka 生产者和消费者服务器,当我尝试发送消息时,出现以下异常:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mail.sender.service.senders.GmailConfirmationSenderService.confirmationMessageListener(com.mail.sender.dto.request.AccountRequest)]
Bean [com.mail.sender.service.senders.GmailConfirmationSenderService@24787445]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.mail.sender.dto.request.AccountRequest] for GenericMessage [payload={"email":"ckopo.6ygy@gmail.com","username":"asdasd-mjeesh","confirmationTokenDetails":{"token":"3fd3c1ee-20ec-420b-8ee9-11d22cd7598e","createdAt":[2016,1,25,21,34,55],"expiredAt":[2023,1,8,18,19,6,661473300]}}, headers={kafka_offset=12, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4008ea0f, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=mail_confirmation_message, kafka_receivedTimestamp=1673193848466, __TypeId__=[B@68a9f1ab, kafka_groupId=account_confirmation_group_id}]
Run Code Online (Sandbox Code Playgroud)
我的生产者服务器的配置
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServersUrl;
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonSerializer.TYPE_MAPPINGS, "accountRequest:com.confirmation_token.model.dto.request.outgoing.AccountRequest"); …
Run Code Online (Sandbox Code Playgroud)