如今在微服务世界中,我在我的工作场所看到很多使用 kafka 消息传递的设计,当您可以使用微服务之间的 rest api 调用获得类似的结果时。从技术上讲,您可以完全停止使用 rest api 调用,而是使用 kafka 消息传递。我真的很想知道最佳实践,优缺点,微服务之间何时使用 api 调用,何时使用 kafka 消息传递。
让我们举一个现实生活中的例子:
我有库存服务和供应商服务。日常供应商服务调用供应商 API 来获取新项目,这些需要转移到库存服务中。项目数最多可达 10,000 个对象。
对于这个用例,最好是:
从供应商 API 获取新数据后,调用库存服务的 REST API 来存储新项目。
从供应商 API 获取新数据后,将它们作为消息发送到 kafka 主题,供库存服务使用
您会选择哪种方式以及考虑什么
rest apache-kafka microservices kafka-consumer-api spring-kafka
编辑FYI:工作gitHub示例
我在互联网上搜索,找不到嵌入式Kafka测试的工作简单示例.
我的设置是:
请帮我.主要是过度配置或过度设计的示例.我相信它可以做得很简单.多谢你们!
@Controller
public class KafkaController {
private static final Logger LOG = getLogger(KafkaController.class);
@KafkaListener(topics = "test.kafka.topic")
public void receiveDunningHead(final String payload) {
LOG.debug("Receiving event with payload [{}]", payload);
//I will do database stuff here which i could check in db for testing
}
}
Run Code Online (Sandbox Code Playgroud)
private static String SENDER_TOPIC ="test.kafka.topic";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Test
public void testSend() throws InterruptedException, ExecutionException { …
Run Code Online (Sandbox Code Playgroud) 我们有一个微服务架构,其中Kafka用作服务之间的通信机制。一些服务具有自己的数据库。假设用户调用服务A,这将导致在该服务的数据库中创建一条记录(或一组记录)。此外,此事件应作为Kafka主题的一项报告给其他服务。确保仅在成功更新Kafka主题(实质上是围绕数据库更新和Kafka更新创建分布式事务)时才写入数据库记录的最佳方法是什么?
我们正在考虑使用spring-kafka(在Spring Boot WebFlux服务中),我可以看到它具有KafkaTransactionManager,但是据我了解,这更多地是关于Kafka交易本身(确保Kafka生产者和消费者之间的一致性),而不是在两个系统之间同步事务(请参见此处:“ Kafka不支持XA,您必须处理在Kafka tx回滚时DB tx可能提交的可能性。”)。另外,我认为此类依赖于Spring的事务框架,至少就我目前所知,该框架是线程绑定的,如果使用反应性方法(例如WebFlux)在操作的不同部分执行该方法,则该类将无法工作。不同的线程。(我们正在使用react-pg-client,因此是手动处理事务,而不是使用Spring的框架。)
我能想到的一些选择:
是否有人对以上内容有任何想法或建议,或者能够纠正上述假设中的任何错误?
提前致谢!
distributed-transactions spring-transactions apache-kafka spring-kafka
使用Kafka作为微服务架构中的消息传递系统,使用spring-kafka与spring-cloud-stream + spring-cloud-starter-stream-kafka有什么好处?
Spring云流框架支持更多的消息传递系统,因此具有更多的模块化设计.但功能呢?spring-kafka和spring-cloud-stream + spring-cloud-starter-stream-kafka的功能之间是否存在差距?哪个API设计得更好?
期待阅读您的意见
我正在 Spring Boot 应用程序中使用 spring-kaka-2.2.0 编写集成测试,我几乎成功了,我的测试用例仍然返回 true,但在那之后我仍然看到多个错误。
2019-02-21 11:12:35.434 ERROR 5717 --- [ Thread-7] kafka.server.ReplicaManager : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645
org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory)
Run Code Online (Sandbox Code Playgroud)
测试配置
@EnableKafka
@TestConfiguration
public class KafkaProducerConfigTest {
@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() {
return new EmbeddedKafkaBroker(1,false,2,"test-events");
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); …
Run Code Online (Sandbox Code Playgroud) java apache-kafka spring-boot spring-kafka spring-kafka-test
我正在使用Kafka和Spring-boot:
卡夫卡制片人班:
@Service
public class MyKafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);
// Send Message
public void sendMessage(String topicName, String message) throws Exception {
LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
}
}); …
Run Code Online (Sandbox Code Playgroud) apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
我们启动一个Kafka消费者,聆听可能尚未创建的主题(尽管已启用主题自动创建).
此后不久,制作人就该主题发布消息.
但是,消费者需要一些时间才能注意到这一点:确切地说是5分钟.此时,消费者撤销其分区并重新加入消费者组.卡夫卡重新稳定了这个群体.查看消费者与kafka日志的时间戳,此过程在消费者端实例化.
我想这是预期的行为,但我想理解这一点.这实际上是一个重新平衡(从0到1分区)?如果我们提前创建主题,这不会发生吗?
2017-02-01 08:36:45.692 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.692 INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2017-02-01 08:36:45.693 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group tps-kafka-partitioning
2017-02-01 08:36:45.738 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group tps-kafka-partitioning with generation 1
2017-02-01 08:36:45.747 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.749 INFO 7 --- [afka-consumer-1] …
Run Code Online (Sandbox Code Playgroud) 我一直在尝试为Spring Kafka做一些POC工作.具体来说,我想尝试在Kafka中消费消息时处理错误方面的最佳实践.
我想知道是否有人能够提供帮助:
2的代码示例如下:
鉴于AckMode设置为RECORD,根据文档:
处理记录后,侦听器返回时提交偏移量.
我认为如果监听器方法抛出异常,偏移量不会增加.但是,当我使用下面的代码/配置/命令组合测试它时,情况并非如此.偏移量仍会更新,并继续处理下一条消息.
我的配置:
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
我的代码:
@Component
public class KafkaMessageListener{
@KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))}) …
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用SpringBoot(1.5.9)和Spring-kafka(2.1.0)来设置Kafka Consumer.但是,当我启动我的应用程序时,我在Kafka MessagingMessageListenerAdapter上获得了java.lang.NoSuchMethodError:org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V.我尝试使用Spring-Kafka(1.2.0)并且错误消失了.有没有其他人遇到这个版本不兼容?
这是我的配置类
@EnableKafka
@Configuration
public class ImporterConfigs{
static Logger logger = Logger.getLogger(ImporterConfigs.class);
@Value("${kafka.bootstrap-servers}")
private static String bootstrapServers;
@Bean
public Map<String, Object> consumerKafkaConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// allows a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new …
Run Code Online (Sandbox Code Playgroud) 我正在使用 Spring-Kafka 编写我的第一个 Kafka Consumer。查看了框架提供的不同选项,并且对此几乎没有怀疑。有人可以在下面澄清,如果你已经工作过。
问题 - 1:根据 Spring-Kafka 文档,有两种方法可以实现 Kafka-Consumer;“您可以通过配置 MessageListenerContainer 并提供消息侦听器或使用 @KafkaListener 注释来接收消息”。有人能告诉我什么时候应该选择一个选项而不是另一个选项吗?
问题 - 2:我选择了 KafkaListener 方法来编写我的应用程序。为此,我需要初始化一个容器工厂实例,并且在容器工厂内部有控制并发的选项。只是想仔细检查我对并发的理解是否正确。
假设,我有一个主题名称 MyTopic,其中有 4 个分区。为了使用来自 MyTopic 的消息,我已经启动了我的应用程序的 2 个实例,这些实例是通过将并发设置为 2 来启动的。因此,理想情况下,根据 kafka 分配策略,2 个分区应分配给 consumer1,其他 2 个分区应分配给 consumer2 . 既然并发设置为2,那么每个consumer是否会启动2个线程,并行的从topic中消费数据?如果我们并行消费,我们还应该考虑什么。
问题 3 - 我选择了手动确认模式,而不是在外部管理偏移量(不将其持久化到任何数据库/文件系统)。那么我是否需要编写自定义代码来处理重新平衡,或者框架会自动管理它?我认为没有,因为我只有在处理完所有记录后才承认。
问题 - 4:另外,使用手动 ACK 模式,哪个监听器会提供更好的性能?BATCH 消息侦听器或普通消息侦听器。我想如果我使用普通消息侦听器,则在处理每条消息后将提交偏移量。
粘贴下面的代码供您参考。
批确认消费者:
public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
for (ConsumerRecord<String, String> record : records) {
System.out.println("Record : " + record.value());
// Process …
Run Code Online (Sandbox Code Playgroud)