标签: spring-kafka

Kafka 消息 VS REST 调用

如今在微服务世界中,我在我的工作场所看到很多使用 kafka 消息传递的设计,当您可以使用微服务之间的 rest api 调用获得类似的结果时。从技术上讲,您可以完全停止使用 rest api 调用,而是使用 kafka 消息传递。我真的很想知道最佳实践,优缺点,微服务之间何时使用 api 调用,何时使用 kafka 消息传递。

让我们举一个现实生活中的例子:

我有库存服务和供应商服务。日常供应商服务调用供应商 API 来获取新项目,这些需要转移到库存服务中。项目数最多可达 10,000 个对象。

对于这个用例,最好是:

  1. 从供应商 API 获取新数据后,调用库存服务的 REST API 来存储新项目。

  2. 从供应商 API 获取新数据后,将它们作为消息发送到 kafka 主题,供库存服务使用

您会选择哪种方式以及考虑什么

rest apache-kafka microservices kafka-consumer-api spring-kafka

41
推荐指数
2
解决办法
1万
查看次数

带弹簧启动的简单嵌入式Kafka测试示例

编辑FYI:工作gitHub示例


我在互联网上搜索,找不到嵌入式Kafka测试的工作简单示例.

我的设置是:

  • 春天的靴子
  • 多个@KafkaListener,在一个类中包含不同的主题
  • 嵌入式Kafka进行测试,开始很好
  • 使用Kafkatemplate进行测试,发送到主题,但 @KafkaListener方法即使在很长的睡眠时间后也没有收到任何内容
  • 没有显示警告或错误,只有日志中来自Kafka的垃圾邮件

请帮我.主要是过度配置或过度设计的示例.我相信它可以做得很简单.多谢你们!

@Controller
public class KafkaController {

    private static final Logger LOG = getLogger(KafkaController.class);

    @KafkaListener(topics = "test.kafka.topic")
    public void receiveDunningHead(final String payload) {
        LOG.debug("Receiving event with payload [{}]", payload);
        //I will do database stuff here which i could check in db for testing
    }
}
Run Code Online (Sandbox Code Playgroud)

private static String SENDER_TOPIC ="test.kafka.topic";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

@Test
    public void testSend() throws InterruptedException, ExecutionException { …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-boot spring-kafka

22
推荐指数
3
解决办法
4万
查看次数

在数据库和Kafka生产者之间同步事务

我们有一个微服务架构,其中Kafka用作服务之间的通信机制。一些服务具有自己的数据库。假设用户调用服务A,这将导致在该服务的数据库中创建一条记录(或一组记录)。此外,此事件应作为Kafka主题的一项报告给其他服务。确保仅在成功更新Kafka主题(实质上是围绕数据库更新和Kafka更新创建分布式事务)时才写入数据库记录的最佳方法是什么?

我们正在考虑使用spring-kafka(在Spring Boot WebFlux服务中),我可以看到它具有KafkaTransactionManager,但是据我了解,这更多地是关于Kafka交易本身(确保Kafka生产者和消费者之间的一致性),而不是在两个系统之间同步事务(请参见此处:“ Kafka不支持XA,您必须处理在Kafka tx回滚时DB tx可能提交的可能性。”)。另外,我认为此类依赖于Spring的事务框架,至少就我目前所知,该框架是线程绑定的,如果使用反应性方法(例如WebFlux)在操作的不同部分执行该方法,则该类将无法工作。不同的线程。(我们正在使用react-pg-client,因此是手动处理事务,而不是使用Spring的框架。)

我能想到的一些选择:

  1. 不要将数据写入数据库:仅将其写入Kafka。然后使用使用者(在服务A中)更新数据库。看来这可能不是最有效的,并且会出现问题,因为用户调用的服务无法立即看到它应该刚刚创建的数据库更改。
  2. 不要直接写到Kafka:只写数据库,并使用Debezium之类的东西向Kafka报告更改。这里的问题是,更改是基于单个数据库记录的,而要存储在Kafka中的重要业务事件可能涉及多个表中数据的组合。
  3. 首先写入数据库(如果失败,则不执行任何操作,仅引发异常)。然后,在写入Kafka时,假设写入可能会失败。使用内置的自动重试功能可以使其保持尝试一段时间。如果最终完全失败,请尝试写入死信队列,并为管理员创建某种手动机制以将其解决。而且,如果写入DLQ失败(即Kafka完全关闭),只需以其他方式(例如,记录到数据库)进行记录,然后再次创建某种手动机制供管理员进行分类即可。

是否有人对以上内容有任何想法或建议,或者能够纠正上述假设中的任何错误?

提前致谢!

distributed-transactions spring-transactions apache-kafka spring-kafka

17
推荐指数
3
解决办法
3727
查看次数

Spring-Kafka vs. Spring-Cloud-Stream(Kafka)

使用Kafka作为微服务架构中的消息传递系统,使用spring-kafka与spring-cloud-stream + spring-cloud-starter-stream-kafka有什么好处?

Spring云流框架支持更多的消息传递系统,因此具有更多的模块化设计.但功能呢?spring-kafka和spring-cloud-stream + spring-cloud-starter-stream-kafka的功能之间是否存在差距?哪个API设计得更好?

期待阅读您的意见

spring spring-integration spring-cloud-stream spring-kafka

16
推荐指数
2
解决办法
4403
查看次数

Spring Kafka集成测试写入高水印文件时出错

我正在 Spring Boot 应用程序中使用 spring-kaka-2.2.0 编写集成测试,我几乎成功了,我的测试用例仍然返回 true,但在那之后我仍然看到多个错误。

2019-02-21 11:12:35.434 ERROR 5717 --- [       Thread-7] kafka.server.ReplicaManager              : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645

org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory)
Run Code Online (Sandbox Code Playgroud)

测试配置

@EnableKafka
@TestConfiguration
public class KafkaProducerConfigTest {

@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() {
    return new EmbeddedKafkaBroker(1,false,2,"test-events");
}


@Bean
public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka spring-kafka-test

16
推荐指数
2
解决办法
4408
查看次数

Kafka制作人TimeoutException:过期1条记录

我正在使用Kafka和Spring-boot:

卡夫卡制片人班:

@Service
public class MyKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);

    // Send Message
    public void sendMessage(String topicName, String message) throws Exception {
        LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
            }
        }); …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

15
推荐指数
1
解决办法
2万
查看次数

为什么Kafka消费者需要很长时间才能开始消费?

我们启动一个Kafka消费者,聆听可能尚未创建的主题(尽管已启用主题自动创建).

此后不久,制作人就该主题发布消息.

但是,消费者需要一些时间才能注意到这一点:确切地说是5分钟.此时,消费者撤销其分区并重新加入消费者组.卡夫卡重新稳定了这个群体.查看消费者与kafka日志的时间戳,此过程在消费者端实例化.

我想这是预期的行为,但我想理解这一点.这实际上是一个重新平衡(从0到1分区)?如果我们提前创建主题,这不会发生吗?

2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.692  INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2017-02-01 08:36:45.693  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group tps-kafka-partitioning
2017-02-01 08:36:45.738  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group tps-kafka-partitioning with generation 1
2017-02-01 08:36:45.747  INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group tps-kafka-partitioning
2017-02-01 08:36:45.749  INFO 7 --- [afka-consumer-1] …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api spring-kafka

13
推荐指数
1
解决办法
4574
查看次数

Kafka消费者异常和抵消提交

我一直在尝试为Spring Kafka做一些POC工作.具体来说,我想尝试在Kafka中消费消息时处理错误方面的最佳实践.

我想知道是否有人能够提供帮助:

  1. 分享围绕Kafka消费者在发生故障时应该做的最佳实践
  2. 帮助我了解AckMode Record如何工作,以及如何在侦听器方法中抛出异常时阻止对Kafka偏移队列的提交.

2的代码示例如下:

鉴于AckMode设置为RECORD,根据文档:

处理记录后,侦听​​器返回时提交偏移量.

我认为如果监听器方法抛出异常,偏移量不会增加.但是,当我使用下面的代码/配置/命令组合测试它时,情况并非如此.偏移量仍会更新,并继续处理下一条消息.

我的配置:

    private Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

   @Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

我的代码:

@Component
public class KafkaMessageListener{
    @KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))}) …
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka kafka-consumer-api spring-kafka

13
推荐指数
1
解决办法
6823
查看次数

带有spring-kafka 2.1.0和SpringBoot 1.5.9的Kafka Consumer上的java.lang.NoSuchMethodError

我正在尝试使用SpringBoot(1.5.9)和Spring-kafka(2.1.0)来设置Kafka Consumer.但是,当我启动我的应用程序时,我在Kafka MessagingMessageListenerAdapter上获得了java.lang.NoSuchMethodError:org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V.我尝试使用Spring-Kafka(1.2.0)并且错误消失了.有没有其他人遇到这个版本不兼容?

这是我的配置类

@EnableKafka
@Configuration
public class ImporterConfigs{

static Logger logger = Logger.getLogger(ImporterConfigs.class);

@Value("${kafka.bootstrap-servers}")
private static String bootstrapServers;

@Bean
public Map<String, Object> consumerKafkaConfigs() {
  Map<String, Object> props = new HashMap<>();
  // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  // allows a pool of processes to divide the work of consuming and processing records
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");

  return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
  return new …
Run Code Online (Sandbox Code Playgroud)

spring-kafka

13
推荐指数
1
解决办法
1万
查看次数

Spring-Kafka 并发属性

我正在使用 Spring-Kafka 编写我的第一个 Kafka Consumer。查看了框架提供的不同选项,并且对此几乎没有怀疑。有人可以在下面澄清,如果你已经工作过。

问题 - 1:根据 Spring-Kafka 文档,有两种方法可以实现 Kafka-Consumer;“您可以通过配置 MessageListenerContainer 并提供消息侦听器或使用 @KafkaListener 注释来接收消息”。有人能告诉我什么时候应该选择一个选项而不是另一个选项吗?

问题 - 2:我选择了 KafkaListener 方法来编写我的应用程序。为此,我需要初始化一个容器工厂实例,并且在容器工厂内部有控制并发的选项。只是想仔细检查我对并发的理解是否正确。

假设,我有一个主题名称 MyTopic,其中有 4 个分区。为了使用来自 MyTopic 的消息,我已经启动了我的应用程序的 2 个实例,这些实例是通过将并发设置为 2 来启动的。因此,理想情况下,根据 kafka 分配策略,2 个分区应分配给 consumer1,其他 2 个分区应分配给 consumer2 . 既然并发设置为2,那么每个consumer是否会启动2个线程,并行的从topic中消费数据?如果我们并行消费,我们还应该考虑什么。

问题 3 - 我选择了手动确认模式,而不是在外部管理偏移量(不将其持久化到任何数据库/文件系统)。那么我是否需要编写自定义代码来处理重新平衡,或者框架会自动管理它?我认为没有,因为我只有在处理完所有记录后才承认。

问题 - 4:另外,使用手动 ACK 模式,哪个监听器会提供更好的性能?BATCH 消息侦听器或普通消息侦听器。我想如果我使用普通消息侦听器,则在处理每条消息后将提交偏移量。

粘贴下面的代码供您参考。

批确认消费者

    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment,
          Consumer<?, ?> consumer) {
      for (ConsumerRecord<String, String> record : records) {
          System.out.println("Record : " + record.value());
          // Process …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka spring-kafka

13
推荐指数
1
解决办法
1万
查看次数