标签: spring-kafka

如何以编程方式启动kafka服务器

我们正在尝试在我们的应用程序中以编程方式启动kafka服务器(zookeeper和broker)。有可用的 api/库吗?

apache-kafka spring-kafka

2
推荐指数
1
解决办法
1192
查看次数

将数据从 kafka 流式传输到 oracle db 的最佳方式是什么

我正在尝试找到一种解决方案,将数据从 Kafka 直接流式传输到 Oracle。最有效的解决方案是什么?

apache-kafka apache-kafka-streams spring-kafka apache-kafka-connect

2
推荐指数
1
解决办法
9170
查看次数

Spring + Kafka:事务处理缓慢

刚开始使用Spring Kafka(2.1.4.RELEASE)和Kafka(1.0.0)但是当我添加事务时,处理速度降低了很多。

代码:

spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.specific.avro.reader=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${application.name}
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
Run Code Online (Sandbox Code Playgroud)

在 Java 中我添加了:

spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.specific.avro.reader=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${application.name}
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
Run Code Online (Sandbox Code Playgroud)

当我删除该setTransactionManager(transactionManager)语句后,速度提高了很多。我做错了什么吗?

performance apache-kafka kafka-consumer-api spring-kafka

2
推荐指数
1
解决办法
8456
查看次数

spring Kafka模型不在可信包中

spring-Kafka-2.1.5我正在与和 一起研究微服务spring-boot-2.0.5

第一个服务将向卡夫卡产生一些消息,第二个服务将消耗它们,在消耗时我遇到了问题

Caused by: java.lang.IllegalArgumentException: The class 'com.service1.model.TopicMessage' is not in the trusted packages: [java.util, java.lang, com.service2.model.ConsumeMessage]. 
If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
Run Code Online (Sandbox Code Playgroud)

所以从错误消息来看,这是com.service1.model.TopicMessageservice1的序列化模型。但我正在尝试将消息反序列化为com.service2.model.ConsumeMessageservice2 中的模型并遇到此问题

我在这里发现了同样的问题,并尝试了以下格式以及文档文档

下面是我的配置

  @Bean(name = "kafkaConsumerConfig")
   public Map<String, Object> kafkaConsumerConfig() {

    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); …
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka spring-boot spring-kafka

2
推荐指数
1
解决办法
6935
查看次数

Spring Kafka 与嵌入式 Kafka 集成测试

我有一个 Spring Boot 应用程序,它有一个消费者从一个集群中的主题消费并生成到不同集群中的另一个主题。

现在我正在尝试使用 Spring 嵌入式 Kafka 编写集成测试用例,但遇到了问题KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource

消费级

@Service
public class KafkaConsumerService {

@Autowired
private KafkaProducerService kafkaProducerService;

@KafkaListener(topics = "${kafka.producer.topic}")
public void professor(List<Professor> pro) {
    pro.forEach(kafkaProducerService::produce);
    
   }

}
Run Code Online (Sandbox Code Playgroud)

制作人班

@Service
public class KafkaProducerService {

@Value("${kafka.producer.topic}")
private String topic;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

public void produce(Professor pro) {
    kafkaTemplate.send(topic,"professor",pro);
  }

 }
Run Code Online (Sandbox Code Playgroud)

在我的测试用例中,我想重写KafkaTemplate,以便当我调用kafkaConsumerService.professor其中的方法时Test,应该将数据生成到嵌入式 Kafka 中,并且我应该验证它。 …

java apache-kafka spring-boot spring-kafka spring-kafka-test

2
推荐指数
1
解决办法
2万
查看次数

在春季启动卡夫卡监听器中接收卡夫卡密钥

我是春季卡夫卡的新手。我有一个微服务,它使用 kafka 密钥发送消息,该密钥是用户定义的对象。

1) 第一个微服务使用 MyKey 对象实例的密钥向 Kafka 发送消息。

2)我需要做的是,聆听该主题并使用密钥获取此消息,并使用该密钥创建一个新密钥。

假设消息是由 myKey 的密钥发送的。我想在侦听器中做的是创建一个新的扩展键:

     @KafkaListener(groupId = Bindings.CONSUMER_GROUP_DATA_CLEANUP, topics = "users")
     public void process( @Payload MyMessage myMessage){

        MyExtended myExtendedKey= new MyExtendedKey(myKey.getX(), myKey.getY());
        ....
        ....
        kafkaTemplate.send(TOPIC,  myExtendedKey, message);
      }
Run Code Online (Sandbox Code Playgroud)

我不知道如何获取在侦听器中发送的消息的密钥。

apache-kafka kafka-consumer-api spring-kafka

2
推荐指数
2
解决办法
7079
查看次数

再次使用来自 kafka 日志压缩主题的消息

我有一个带有 Kafka 消费者的 Spring 应用程序,使用 @KafkaListerner 注释。正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的场景。以编程方式实现这一目标的最佳方法是什么?我们不控制 Kafka 主题配置。

apache-kafka spring-kafka

2
推荐指数
1
解决办法
1458
查看次数

关于故障点,如何确保生产者和消费者应用程序之间的 kafka 事务同步?

总的来说,我对 Spring-Kafka/Kafka 还是有点陌生​​。我的问题相当简短。我有一个仅限消费者的应用程序,它不断地从 Kafka 读取、处理消息,并使用 Ack Listener 手动确认它们。我依赖于上游仅生产者的应用程序,其中它们负责将消息发送到 Kafka 主题以便我使用。我们最近实现了跨生产者和消费者的事务,但我想更多地了解故障点以及如何处理那些回滚的事务以免丢失?我读过,最好使用AfterRollbackProcessor而不是用于SeekToCurrentErrorHandlerkafka 容器工厂上的事务,并StatefulRetry设置为true。我使用事务的原因是为了在新版本中实现一次性 Kafka 语义,因为我们处理大量数据库持久性,并且由于数据库限制而无法承受重复事务。我想知道我是否@KafkaListener必须注释,@Transactional因为我在声明不应是这种情况之前读过一篇文章,但其他文章可能是这种情况,这就是我不确定的原因。我见过很多关于生产者和消费者应用程序的问题,但我还没有看到关于分别具有这些不同角色的单独应用程序的问题(即使最终可能是同一件事)。简而言之,我只是想知道将事务与 Kafka 合并时的最佳实践是什么,以及如何处理这种情况下的故障。

java spring apache-kafka spring-kafka

2
推荐指数
1
解决办法
401
查看次数

带有键值类型消息的 spring 云流 SOURCE

我试图使用 spring 云流发送带有键值对的消息。我无法为此找到任何 API。org.springframework.messaging.MessageChannel 仅将有效负载作为发送功能的一部分。尽管使用 Kafka 模板可以实现这一点。这是生成键值类型消息的唯一方法。由于 KafkaTemplate 是 apache kafka 的 spring 的一部分,我希望在 spring 云流中有一个可用的抽象。请建议。

谢谢,

spring kafka-producer-api spring-cloud-stream spring-kafka

2
推荐指数
1
解决办法
1242
查看次数

JAAS/SASL 的 spring-kafka application.properties 配置不起作用

用例:
我正在使用 Spring Boot2.2.5.RELEASE并且Kafka 2.4.1
JAAS/SASL 配置在 Kafka/ZooKeeper 上正确完成,因为创建的主题没有问题kafka-topics.bat

问题:
当我启动 Spring Boot 应用程序时,我立即收到以下错误:

kafka-server-start.bat 控制台:
INFO [SocketServer brokerId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

IDE控制台:
WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=yyy] Bootstrap broker localhost:9093 (id: -3 rack: null) disconnected

我的application.properties配置:

spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="spring_bO0t" password="i_am_a_spring_bO0t_user";
Run Code Online (Sandbox Code Playgroud)

kafka_server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="12345"
    user_admin="12345"
    user_spring_bO0t="i_am_a_spring_bO0t_user";
};
Run Code Online (Sandbox Code Playgroud)

我错过了什么吗?

提前致谢。

spring-kafka

2
推荐指数
2
解决办法
7138
查看次数