我们正在尝试在我们的应用程序中以编程方式启动kafka服务器(zookeeper和broker)。有可用的 api/库吗?
我正在尝试找到一种解决方案,将数据从 Kafka 直接流式传输到 Oracle。最有效的解决方案是什么?
apache-kafka apache-kafka-streams spring-kafka apache-kafka-connect
刚开始使用Spring Kafka(2.1.4.RELEASE)和Kafka(1.0.0)但是当我添加事务时,处理速度降低了很多。
代码:
spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.specific.avro.reader=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${application.name}
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
Run Code Online (Sandbox Code Playgroud)
在 Java 中我添加了:
spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.specific.avro.reader=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${application.name}
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
Run Code Online (Sandbox Code Playgroud)
当我删除该setTransactionManager(transactionManager)语句后,速度提高了很多。我做错了什么吗?
spring-Kafka-2.1.5我正在与和 一起研究微服务spring-boot-2.0.5
第一个服务将向卡夫卡产生一些消息,第二个服务将消耗它们,在消耗时我遇到了问题
Caused by: java.lang.IllegalArgumentException: The class 'com.service1.model.TopicMessage' is not in the trusted packages: [java.util, java.lang, com.service2.model.ConsumeMessage].
If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
Run Code Online (Sandbox Code Playgroud)
所以从错误消息来看,这是com.service1.model.TopicMessageservice1的序列化模型。但我正在尝试将消息反序列化为com.service2.model.ConsumeMessageservice2 中的模型并遇到此问题
下面是我的配置
@Bean(name = "kafkaConsumerConfig")
public Map<String, Object> kafkaConsumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); …Run Code Online (Sandbox Code Playgroud) 我有一个 Spring Boot 应用程序,它有一个消费者从一个集群中的主题消费并生成到不同集群中的另一个主题。
现在我正在尝试使用 Spring 嵌入式 Kafka 编写集成测试用例,但遇到了问题KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource
消费级
@Service
public class KafkaConsumerService {
@Autowired
private KafkaProducerService kafkaProducerService;
@KafkaListener(topics = "${kafka.producer.topic}")
public void professor(List<Professor> pro) {
pro.forEach(kafkaProducerService::produce);
}
}
Run Code Online (Sandbox Code Playgroud)
制作人班
@Service
public class KafkaProducerService {
@Value("${kafka.producer.topic}")
private String topic;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void produce(Professor pro) {
kafkaTemplate.send(topic,"professor",pro);
}
}
Run Code Online (Sandbox Code Playgroud)
在我的测试用例中,我想重写KafkaTemplate,以便当我调用kafkaConsumerService.professor其中的方法时Test,应该将数据生成到嵌入式 Kafka 中,并且我应该验证它。 …
java apache-kafka spring-boot spring-kafka spring-kafka-test
我是春季卡夫卡的新手。我有一个微服务,它使用 kafka 密钥发送消息,该密钥是用户定义的对象。
1) 第一个微服务使用 MyKey 对象实例的密钥向 Kafka 发送消息。
2)我需要做的是,聆听该主题并使用密钥获取此消息,并使用该密钥创建一个新密钥。
假设消息是由 myKey 的密钥发送的。我想在侦听器中做的是创建一个新的扩展键:
@KafkaListener(groupId = Bindings.CONSUMER_GROUP_DATA_CLEANUP, topics = "users")
public void process( @Payload MyMessage myMessage){
MyExtended myExtendedKey= new MyExtendedKey(myKey.getX(), myKey.getY());
....
....
kafkaTemplate.send(TOPIC, myExtendedKey, message);
}
Run Code Online (Sandbox Code Playgroud)
我不知道如何获取在侦听器中发送的消息的密钥。
我有一个带有 Kafka 消费者的 Spring 应用程序,使用 @KafkaListerner 注释。正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的场景。以编程方式实现这一目标的最佳方法是什么?我们不控制 Kafka 主题配置。
总的来说,我对 Spring-Kafka/Kafka 还是有点陌生。我的问题相当简短。我有一个仅限消费者的应用程序,它不断地从 Kafka 读取、处理消息,并使用 Ack Listener 手动确认它们。我依赖于上游仅生产者的应用程序,其中它们负责将消息发送到 Kafka 主题以便我使用。我们最近实现了跨生产者和消费者的事务,但我想更多地了解故障点以及如何处理那些回滚的事务以免丢失?我读过,最好使用AfterRollbackProcessor而不是用于SeekToCurrentErrorHandlerkafka 容器工厂上的事务,并StatefulRetry设置为true。我使用事务的原因是为了在新版本中实现一次性 Kafka 语义,因为我们处理大量数据库持久性,并且由于数据库限制而无法承受重复事务。我想知道我是否@KafkaListener必须注释,@Transactional因为我在声明不应是这种情况之前读过一篇文章,但其他文章可能是这种情况,这就是我不确定的原因。我见过很多关于生产者和消费者应用程序的问题,但我还没有看到关于分别具有这些不同角色的单独应用程序的问题(即使最终可能是同一件事)。简而言之,我只是想知道将事务与 Kafka 合并时的最佳实践是什么,以及如何处理这种情况下的故障。
我试图使用 spring 云流发送带有键值对的消息。我无法为此找到任何 API。org.springframework.messaging.MessageChannel 仅将有效负载作为发送功能的一部分。尽管使用 Kafka 模板可以实现这一点。这是生成键值类型消息的唯一方法。由于 KafkaTemplate 是 apache kafka 的 spring 的一部分,我希望在 spring 云流中有一个可用的抽象。请建议。
谢谢,
用例:
我正在使用 Spring Boot2.2.5.RELEASE并且Kafka 2.4.1
JAAS/SASL 配置在 Kafka/ZooKeeper 上正确完成,因为创建的主题没有问题kafka-topics.bat
问题:
当我启动 Spring Boot 应用程序时,我立即收到以下错误:
kafka-server-start.bat 控制台:
INFO [SocketServer brokerId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
IDE控制台:
WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=yyy] Bootstrap broker localhost:9093 (id: -3 rack: null) disconnected
我的application.properties配置:
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="spring_bO0t" password="i_am_a_spring_bO0t_user";
Run Code Online (Sandbox Code Playgroud)
kafka_server_jaas.conf:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="12345"
user_admin="12345"
user_spring_bO0t="i_am_a_spring_bO0t_user";
};
Run Code Online (Sandbox Code Playgroud)
我错过了什么吗?
提前致谢。