标签: spring-kafka

如何在不同线程中处理@KafkaListener方法?

我在 Spring Boot 中有卡夫卡处理程序:

    @KafkaListener(topics = "topic-one", groupId = "response")
    public void listen(String response) {
        myService.processResponse(response);
    }
Run Code Online (Sandbox Code Playgroud)

例如,生产者每秒发送一条消息。但myService.processResponse工作10秒。我需要处理每条消息并myService.processResponse在新线程中开始。我可以创建我的执行者并将每个响应委托给它。但我认为 kafka 中还有其他配置可供使用。我找到了2个:

1)添加concurrency = "5"@KafkaListener注释 - 它似乎有效。但我不确定有多正确,因为我有第二种方法:

2)我可以创建ConcurrentKafkaListenerContainerFactory并设置它ConsumerFactory并且concurrency

我不明白这些方法之间的区别?concurrency = "5"只需添加到注释就足够了@KafkaListener还是我需要创建ConcurrentKafkaListenerContainerFactory

或者我根本不明白什么,还有其他方法吗?

java apache-kafka spring-boot kafka-consumer-api spring-kafka

2
推荐指数
2
解决办法
1万
查看次数

fetch-min-size 和 max-poll-records sping kafka 配置无法按预期工作

我正在使用 spring kafka 开发一个 Spring boot 应用程序,该应用程序侦听 kafka 的单个主题,然后隔离各个类别的记录,从中创建一个 json 文件并将其上传到 AWS S3。

我在 Kafka 主题中收到大量数据,我需要确保 json 文件分块得足够大,以限制上传到 S3 的 json 数量。

以下是我application.yml对 kafka 消费者的配置。

spring:
  kafka:
    consumer:
      group-id: newton
      auto-offset-reset: earliest
      fetch-max-wait: 
        seconds: 1 
      fetch-min-size: 500000000
      max-poll-records: 50000000
      value-deserializer: com.forwarding.application.consumer.model.deserializer.MeasureDeserializer
Run Code Online (Sandbox Code Playgroud)

我创建了一个监听器来连续阅读该主题。

即使使用上述配置,我在控制台中收到的记录如下:

   2019-03-27T15:25:56.02+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.024  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 56. No Of measures: 60
   2019-03-27T15:25:56.21+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.210  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 80. No Of measures: …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-boot kafka-consumer-api spring-kafka

2
推荐指数
1
解决办法
2万
查看次数

将主题名称的数组列表传递给@KafkaListener

我试图按照https://github.com/spring-projects/spring-kafka/issues/361将主题名称从 .yml 文件传递​​到 @kafkalistener。但编译器抛出以下错误

Type mismatch.
Required:
Array<String>
Found:
String
 Unresolved reference: spring
Run Code Online (Sandbox Code Playgroud)

下面是接收者代码

@Component
class Receiver {

    companion object {
        private val LOGGER = LoggerFactory.getLogger(Receiver::class.java)
    }

    @Autowired
    private val taskExecutor: TaskExecutor? = null

    @Autowired
    private val applicationContext: ApplicationContext? = null

    @KafkaListener(topics = "#{'${spring.kafka.topics}'.split(',')}")
    fun receive(@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String) {

    }    
}
Run Code Online (Sandbox Code Playgroud)

下面是我的 build.gradle 文件

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "2.1.7.RELEASE"
    id("io.spring.dependency-management") version "1.0.8.RELEASE"
    kotlin("jvm") version "1.2.71"
    kotlin("plugin.spring") version "1.2.71"
}

group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility …
Run Code Online (Sandbox Code Playgroud)

kotlin spring-boot spring-kafka

2
推荐指数
1
解决办法
5446
查看次数

为两个独立的集群配置 Spring Kafka

是否可以将 Kafka 配置为在单个 Spring Boot 应用程序中使用两个独立的集群?

用例:我有两个带有副本+zookeeper的集群:

  • 集群 #1引导服务器server1.example.com,server2.example.com,server3.example.com
  • 集群 #2引导服务器target-server1.example.com,target-server2.example.com,target-server3.example.com

我需要使用该消息,Cluster #1然后根据该数据进行一些计算,并生成Cluster #2主题结果。有没有办法在单个 Spring 应用程序中配置 Kafka 来处理这种方法?

apache-kafka spring-boot spring-kafka

2
推荐指数
1
解决办法
2678
查看次数

Spring Kafka Embedded - 测试之间已经存在主题

我创建了一组带有嵌入式 kafka (spring-kafka-test) 的测试 (JUnit 5),当我有时(并非总是)运行它们时,我在单次运行的一个或多个测试中得到“主题 'some_name' 已存在”。

所有测试都使用相同的主题名称(我不想为每个测试更改该名称),测试类具有 DirtiesContext 注释(AFTER_EACH_TEST_METHOD)。我不确定这个问题的原因是什么,以及如何解决它。

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@ActiveProfiles("test")
public class RemovalKafkaTestIT {
    private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);
    private final static String SERVER_ADDRES = "127.0.0.1:9092";

    private Consumer<String, String> prepareConsumer() {
        Map<String, Object> configsConsumer = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        configsConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configsConsumer.put("bootstrap.servers", SERVER_ADDRES);
        Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configsConsumer, new StringDeserializer(), new StringDeserializer()).createConsumer();
        consumer.subscribe(singleton("some_name"));
        return consumer;
    }

    @Test
    public void …
Run Code Online (Sandbox Code Playgroud)

java junit5 spring-kafka

2
推荐指数
1
解决办法
8050
查看次数

client.dns.lookup 选项中的“use_all_dns_ips”和“resolve_canonical_bootstrap_servers_only”之间的确切区别是什么?

在kafka-client 2.1.0中,client.dns.lookup可用。以下是每个选项的说明。

  1. use_all_dns_ips

    当查找返回主机名的多个 IP 地址时,在连接失败之前将尝试连接所有这些 IP 地址

  2. 仅限resolve_canonical_bootstrap_servers_only

    每个条目都将被解析并扩展为规范名称列表

他们不是都使用dns吗?use_all_dns_ipsresolve_canonical_bootstrap_servers_only之间有什么区别?

apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

2
推荐指数
1
解决办法
4911
查看次数

反序列化 avro 对象时出现 spring MessageConversionException

我正在发送一个 avro 对象 User 并在侦听器中接收它。这是配置

@Bean
public ProducerFactory<String, User> userProducerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);

    return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTemplate<String, User> userKafkaTemplate() {
    return new KafkaTemplate<>(userProducerFactory());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro");

    return props;
}

@Bean
public ConsumerFactory<String, User> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
            new AvroDeserializer<>(User.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() { …
Run Code Online (Sandbox Code Playgroud)

java avro apache-kafka spring-kafka

2
推荐指数
1
解决办法
4706
查看次数

如何在 spock 中测试 ListenableFuture 回调

几天前我问了一个关于存根 kafka.send() 方法的未来响应的问题。@kriegaex在这里正确回答并解释了这一点\n虽然我遇到了另一个问题,但我如何测试这个未来响应的 onSuccess 和 onFailure 回调。这是正在测试的代码。

\n\n
import org.springframework.kafka.core.KafkaTemplate;\nimport org.springframework.kafka.support.SendResult;\nimport org.springframework.util.concurrent.ListenableFuture;\nimport org.springframework.util.concurrent.ListenableFutureCallback;\n\npublic class KakfaService {\n\n    private final KafkaTemplate<String, String> kafkaTemplate;\n    private final LogService logService;\n\n    public KakfaService(KafkaTemplate kafkaTemplate, LogService logService){\n        this.kafkaTemplate = kafkaTemplate;\n        this.logService = logService;\n    }\n\n    public void sendMessage(String topicName, String message) {\n        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);\n        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {\n\n            @Override\n            public void onSuccess(SendResult<String, String> result) {\n              LogDto logDto = new LogDto();\n              logDto.setStatus(StatusEnum.SUCCESS);\n              logService.create(logDto)\n            }\n            @Override\n            public void onFailure(Throwable ex) {\n              LogDto logDto = …
Run Code Online (Sandbox Code Playgroud)

groovy unit-testing spock spring-kafka

2
推荐指数
1
解决办法
5238
查看次数

多个Spring Cloud Stream应用程序一起运行

我参考了此处发布的示例。我正在尝试一起运行多个 Spring Cloud Stream 应用程序。这里第一个的输出作为其他的输入。以下是我正在尝试做的事情。

@Bean
    public Function<KStream<FormUUID, FormData>, KStream<UUID, Application>> process()
    {
        //do some processing here and return 
    }
// read output from above process and join it with an event stream
@Bean
    public BiConsumer<KStream<UUID, ProcessEvent>, KTable<UUID, Application>> listen()
    {

        return (eventStream,appTable )-> eventStream
                .join(appTable, (event, app) -> app).foreach((k, app) -> app.createQuote());

    }
Run Code Online (Sandbox Code Playgroud)

application.yml 如下所示

spring.cloud:
 function: process;listen
 stream:
  kafka.streams:
    bindings:
      process-in-0.consumer.application-id: form-aggregator
      listen-in-0.consumer.application-id: event-processor
      listen-in-1.consumer.application-id: event-processor
    binder.configuration:
      default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.key.default.type: com.xxx.datamapper.domain.FormUUID
      spring.json.value.default.type: com.xxx.datamapper.domain.FormData
      commit.interval.ms: 1000 …
Run Code Online (Sandbox Code Playgroud)

spring-cloud-stream spring-kafka

2
推荐指数
1
解决办法
2081
查看次数

Spring 和 Kafka - 具有相同组 ID 的多个消费者 - 只有一个消费者获取消息

我正在使用 Spring Boot 开始使用 Apache Kafka。我想实现以下事实:同一服务的两个实例以循环方式消耗同一主题的消息。

因此,实例 1 接收主题的第一条消息,实例 2 接收第二条消息,实例 1 接收第三条消息,依此类推。

这是我当前的配置:

spring:
  kafka:
    consumer:
      bootstrap-servers: rommel.local:9092
      group-id: core
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
    producer:
      bootstrap-servers: rommel.local:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      missing-topics-fatal: false
      ack-mode: record
Run Code Online (Sandbox Code Playgroud)

这是我的听众:

@KafkaListener(topics = "new-topic", groupId = "core")
    public void consume(String payload) {
        System.out.println("Data received: " + payload);
    }
Run Code Online (Sandbox Code Playgroud)

问题是,当我运行应用程序的两个实例时,只有一个实例收到该主题的消息。

我在日志中意识到了这一点:

Instance 1:
core: partitions assigned: [new-topic-0]
Run Code Online (Sandbox Code Playgroud)
Instance 2:
core: partitions assigned: []
Run Code Online (Sandbox Code Playgroud)

那么当我杀死实例1时。被new-topic-0分配给实例 2。

那么如何实现两个实例循环获取同一主题消息的场景呢?

谢谢!!

apache-kafka spring-boot spring-kafka

2
推荐指数
1
解决办法
8130
查看次数