标签: spring-kafka

发送到 kafka 主题时序列化消息时出错

我需要测试包含标题的消息,所以我需要使用 MessageBuilder,但我无法序列化。

我尝试在生产者道具上添加序列化设置,但没有奏效。

有人能帮我吗?

这个错误:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Run Code Online (Sandbox Code Playgroud)

我的测试班:

public class TransactionMastercardAdapterTest extends AbstractTest{

@Autowired
private KafkaTemplate<String, Message<String>> template;

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

@BeforeClass
public static void setUp() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}

@Test
public void sendTransactionCommandTest(){

    String payload = "{\"o2oTransactionId\" : \"" + UUID.randomUUID().toString().toUpperCase() + "\","
            + "\"cardId\" : \"11\","
            + "\"transactionId\" : \"20110405123456\","
            + "\"amount\" : 200.59,"
            + "\"partnerId\" : \"11\"}";

    Map<String, Object> …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api spring-cloud-stream spring-kafka

10
推荐指数
1
解决办法
3万
查看次数

Spring Kafka JsonDesirialization MessageConversionException 未能解析类名 Class not found

我有两个服务应该通过Kafka. 让我们调用第一个服务WriteService和第二个服务QueryService

WriteService端,我对生产者有以下配置。

@Configuration
public class KafkaProducerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
Run Code Online (Sandbox Code Playgroud)

我正在尝试发送类的对象 com.example.project.web.routes.dto.RouteDto

在 …

spring apache-kafka json-deserialization spring-boot spring-kafka

10
推荐指数
1
解决办法
5549
查看次数

何时使用 ConcurrentKafkaListenerContainerFactory?

我是 kafka 的新手,我浏览了文档,但我什么也不懂。有人可以解释一下什么时候使用这个ConcurrentKafkaListenerContainerFactory类吗?我已经使用过这个Kafkaconsumer类,但我看到ConcurrentKafkaListenerContainerFactory在我当前的项目中使用了它。请解释它的用途。

java apache-kafka kafka-consumer-api spring-kafka

10
推荐指数
2
解决办法
1万
查看次数

如何以编程方式自动配置?

我有一个春季启动kafka应用程序。我的经纪人每隔几天就会被回收一次。旧的经纪人已取消配置,新的经纪人已配置。

我有一个调度程序,每隔几个小时要检查一次代理。我想确保一旦有了新的经纪人,我们就应该重新加载所有与Spring Kafka相关的bean。与KafkaAutoConfiguration非常相似,除了我希望触发代理值更改并以编程方式加载自动配置。

每当将旧的代理替换为新的代理时,如何以编程方式调用自动配置?

java spring spring-boot spring-kafka

10
推荐指数
1
解决办法
424
查看次数

Spring Boot 2.3.2.RELEASE - Micrometer - KafkaMetrics - 无法绑定仪表日志

我正在使用/actuator/prometheus端点作为其kafka_consumer_*指标。从Spring Boot 2.3.1.RELEASEto升级,向2.3.2.RELEASE我展示了很多这些“额外”日志——每当绑定失败时:

信息 io.micrometer.core.instrument.binder.kafka.KafkaMetrics.lambda$checkAndBindMetrics$1[173] -- 无法绑定仪表:kafka.consumer.fetch.manager.[metric1]...但是,这可能发生并且可能下次刷新时恢复。
信息 io.micrometer.core.instrument.binder.kafka.KafkaMetrics.lambda$checkAndBindMetrics$1[173] -- 无法绑定仪表:kafka.consumer.fetch.manager.[metric2]...但是,这可能发生并且可能下次刷新时恢复。
信息 io.micrometer.core.instrument.binder.kafka.KafkaMetrics.lambda$checkAndBindMetrics$1[173] -- 无法绑定仪表:kafka.consumer.fetch.manager.[metric3]...但是,这可能发生并且可能下次刷新时恢复。
信息 io.micrometer.core.instrument.binder.kafka.KafkaMetrics.lambda$checkAndBindMetrics$1[173] -- 无法绑定仪表:kafka.consumer.fetch.manager.[metric4]...但是,这可能发生并且可能下次刷新时恢复。

它是一个INFO日志级别,带有一些令人放心的结尾词However, this could happen and might be restored in the next refresh.,因此,并不意味着令人担忧,但是在此升级中显示它的目的是什么?

与此同时,我用以下方法抑制了它们(额外的日志): logging.level.io.micrometer.core.instrument.binder.kafka.KafkaMetrics=WARN

spring-boot spring-kafka spring-micrometer

10
推荐指数
1
解决办法
2182
查看次数

Spring Kafka 错误:此错误处理程序无法直接处理“SerializationException”;请考虑配置“ErrorHandlingDeserializer”

生产者属性

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
Run Code Online (Sandbox Code Playgroud)

消费属性

spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.group-id=user-group
server.port=8085
Run Code Online (Sandbox Code Playgroud)

消费者服务

@Service
public class UserConsumerService {

    @KafkaListener(topics = { "user-topic" })
    public void consumerUserData(User user) {
        System.out.println("Users Age Is: " + user.getAge() + " Fav Genre " + user.getFavGenre());
    }
}
Run Code Online (Sandbox Code Playgroud)

生产者服务

@Service
public class UserProducerService {

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    public void sendUserData(User user) {
        kafkaTemplate.send("user-topic", user.getName(), user);
    }
}
Run Code Online (Sandbox Code Playgroud)

用于创建主题的生产者配置

    @Configuration public class KafkaConfig {
    
        @Bean
        public NewTopic topicOrder() {
            return TopicBuilder.name("user-topic").partitions(2).replicas(1).build();
        } 
}
Run Code Online (Sandbox Code Playgroud)

生产者工作得很好,但消费者给出了类似的错误

2021-12-06 …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka

10
推荐指数
1
解决办法
4万
查看次数

Kafka Json消费者错误java.lang.NoSuchFieldError:READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE

我正在使用 Spring Boot 和 Json Deserializer 来运行 kafka Consumer。运行应用程序时我收到错误

Caused by: java.lang.NoSuchFieldError: READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE
                at com.fasterxml.jackson.databind.deser.std.EnumDeserializer.createContextual(EnumDeserializer.java:211)
                at com.fasterxml.jackson.databind.DeserializationContext.handlePrimaryContextualization(DeserializationContext.java:836)
                at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.resolve(BeanDeserializerBase.java:550)
                at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:294)
                at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:244)
                at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:142)
                at com.fasterxml.jackson.databind.DeserializationContext.findNonContextualValueDeserializer(DeserializationContext.java:644)
                at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.resolve(BeanDeserializerBase.java:539)
                at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:294)
                at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:244)
                at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:142)
                at com.fasterxml.jackson.databind.DeserializationContext.findNonContextualValueDeserializer(DeserializationContext.java:644)
                at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.resolve(BeanDeserializerBase.java:539)
                at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:294)
                at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:244)
                at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:142)
                at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:654)
                at com.fasterxml.jackson.databind.ObjectReader._prefetchRootDeserializer(ObjectReader.java:2430)
                at com.fasterxml.jackson.databind.ObjectReader.<init>(ObjectReader.java:194)
                at com.fasterxml.jackson.databind.ObjectMapper._newReader(ObjectMapper.java:780)
                at com.fasterxml.jackson.databind.ObjectMapper.readerFor(ObjectMapper.java:4263)
                at org.springframework.kafka.support.serializer.JsonDeserializer.initialize(JsonDeserializer.java:502)
                at org.springframework.kafka.support.serializer.JsonDeserializer.setupTarget(JsonDeserializer.java:487)
                ... 56 common frames omitted
Run Code Online (Sandbox Code Playgroud)

我正在使用 spring-boot 3.0.5 和 jackson-databing 2.15.0。有人遇到过同样的错误吗?

我试图调试这个问题,它似乎与我的域对象包含枚举类型作为属性这一事实有关。Jackson 扫描对象的结构并急切地加载每种类型的反序列化器,当到达枚举类型时它会失败。有谁知道我如何跟踪和解决问题?

apache-kafka spring-kafka jackson-databind

10
推荐指数
2
解决办法
7328
查看次数

Spring Boot Kafka:由于NoSuchBeanDefinitionException,无法启动使用者

在我的spring启动服务中尝试启动kafka使用者时看到NoSuchBeanDefinitionException并且无法启动服务本身.

下面是我的bean类,它包含为Kafka配置创建的所有必需bean

Spring Boot版本:1.5.2.RELEASE

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import com.ns.kafka.gateway.dtos.GatewayCallBackMessage;

@Configuration
@EnableKafka
public class GatewayCallbackToPNConsumerConfig {
 @Bean
 public Map < String, Object > consumerProps() {
  Map < String, Object > props = new HashMap < > ();
  props.put(null, null);
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "gatewaycallbacktopngroup");
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
  return props;
 }

 @Bean
 public Deserializer < String > …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka kafka-consumer-api spring-kafka

9
推荐指数
1
解决办法
2万
查看次数

Kuberka在Kubernetes - 将协调员标记为团体死亡

我对Kubernetes很新,并想用它设置Kafka和zookeeper.我能够使用StatefulSets在Kubernetes中设置Apache Kafka和Zookeeper.我按照这个这个来构建我的清单文件.我制作了1张kafka和zookeeper的复制品,并且还使用了持久卷.所有pod都在运行并准备就绪.

我试图Service通过指定nodePort(30010)来公开kafka并将其用于此目的.看起来这会将kafka暴露给外面的世界,在那里他们可以向kafka经纪人发送消息并从中消费.

但是在我的Java应用程序中,我创建了一个使用者并添加了bootstrapServer <ip-address>:30010,显示了以下日志:

INFO o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) for group workerListener.
INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) dead for group workerListener
Run Code Online (Sandbox Code Playgroud)

有趣的是,当我使用kubectl命令测试集群时,我能够生成和使用消息:

kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 produce --restart=Never --rm \
 -- kafka-console-producer.sh --topic test --broker-list kafka-0.kafka-hs.default.svc.cluster.local:9093 done;

kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 consume --restart=Never --rm -- kafka-console-consumer.sh --topic test --bootstrap-server kafka-0.kafka-hs.default.svc.cluster.local:9093
Run Code Online (Sandbox Code Playgroud)

有人能指出我正确的方向为什么它将协调员标记为死亡?

kafka.yml

apiVersion: v1
kind: Service
metadata:
  name: kafka-hs …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kubernetes apache-zookeeper spring-kafka

9
推荐指数
1
解决办法
1643
查看次数

如何将动态主题名称从环境变量传递给@KafkaListener(topics)

我正在写一个 Kafka 消费者。我需要将环境变量主题名称传递给@KafkaListener(topics = ...). 这是我迄今为止尝试过的:

 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.kafka.annotation.KafkaListener; 
 import org.springframework.stereotype.Service;

 @Service
 public class KafkaConsumer {

     @Autowired
     private EnvProperties envProperties;

     private final String topic = envProperties.getTopic();

     @KafkaListener(topics = "#{'${envProperties.getTopic()}'}", groupId = "group_id")
     public void consume(String message) {
        logger.info("Consuming messages " +envProperties.getTopic());
     }
}
Run Code Online (Sandbox Code Playgroud)

我在线路上遇到错误topics = "#{'${envProperties.getTopic()}'}",应用程序无法启动。

如何从环境变量动态设置此主题名称?

java spring-el spring-boot kafka-consumer-api spring-kafka

9
推荐指数
1
解决办法
3357
查看次数