标签: spring-kafka

如何使用Spring Kafka生产者发送批量数据

目前我有这样的代码:

KafkaTemplate<String, String> kafkaTemplate;

List<Pet> myData;

for(Pet p: myData) {
  String json = objectWriter.writeValueAsString(p)
  kafkaTemplate.send(topic, json)
}

Run Code Online (Sandbox Code Playgroud)

所以每个列表项都是一一发送的。如何一次发送整个列表?

java apache-kafka spring-kafka

9
推荐指数
2
解决办法
2万
查看次数

Spring boot 3 和 Swagger ui java.lang.NoSuchMethodError: 'io.swagger.v3.oas.annotations.media.Schema$AdditionalPropertiesValue otherProperties()

您好,升级到 Spring Boot 3 后,我遇到了 swagger ui 问题。swagger-ui 不再工作,我得到了 404 和“白色标签”页面作为响应。

仔细查看后,我需要更改:implementation(group: 'org.springdoc', name: 'springdoc-openapi-ui', version: '1.5.8')

至:实现(组:'org.springdoc',名称:'springdoc-openapi-starter-webmvc-ui',版本:'2.0.3')

之后它确实起作用了,但是在我的其他项目之一中,我们遇到了依赖冲突的问题,因此 swagger-ui 在尝试获取 /v3/api-docs/: 之间抛出 500: io.confluence :kafka-avro-serializer:7.3.1 和 springdoc-openapi-starter-webmvc-ui:2.0.3 这两个依赖项需要使用 io.swagger.core.v3:swagger-annotations-jakarta,但版本不同

因此,在 gradle 中,我必须解决冲突以强制 io.swagger.core.v3:swagger-annotations-jakarta:2.2.8。

java spring swagger-ui spring-boot spring-kafka

9
推荐指数
2
解决办法
1万
查看次数

如何在springboot中动态地为每个主题创建单独的Kafka监听器?

我是Spring和Kafka的新手.我正在研究一个用例[使用SpringBoot-kafka],允许用户在运行时创建kafka主题.spring应用程序应该在运行时以编程方式订阅这些主题.到目前为止我所知道的是,Kafka听众是设计时间,因此需要在启动前指定主题.有没有办法动态订阅SpringBoot-Kafka集成中的kafka主题?

推荐这个 https://github.com/spring-projects/spring-kafka/issues/132

我计划实现的当前方法是,不要使用Spring-Kafka集成而不是自己实现Kafka消费者[使用java代码]如此处提到的 spring boot kafka consumer - 如何正确地使用来自spring boot的kafka消息

spring spring-mvc spring-boot kafka-consumer-api spring-kafka

8
推荐指数
1
解决办法
4743
查看次数

Spring Kafka:ApplicationContext中不同对象的多个侦听器

我可以请与社区核实听取多个主题的最佳方式是什么,每个主题包含不同类的消息?

在过去的几天里,我一直在玩Spring Kafka.到目前为止我的思考过程:

  • 因为在初始化KafkaListenerContainerFactory时需要将反序列化器传递给DefaultKafkaConsumerFactory.这似乎表明,如果我需要多个容器,每个反序列化一个不同类型的消息,我将无法使用@EnableKafka和@KafkaListener注释.

  • 这使我认为这样做的唯一方法是实例化多个KafkaMessageListenerContainer.

  • 鉴于KafkaMessageListenerContainers是单线程的,我需要同时监听多个主题,我真的应该使用多个ConcurrentKafkaMessageListenerContainers.

我会在这里走上正轨吗?有一个更好的方法吗?

谢谢!

java apache-kafka spring-kafka

8
推荐指数
3
解决办法
9528
查看次数

spring kafka 有多个生产者的代码示例吗?

我有一个可能需要多个生产者的应用程序。我看到的所有代码示例似乎都支持单个生产者,在应用程序启动期间从应用程序读取配置。如果有多个生产者并且我们想传入不同的生产者配置,那么 Spring 是否有开箱即用的支持?或者在这种情况下我应该没有弹簧吗?

spring apache-kafka spring-kafka

8
推荐指数
3
解决办法
6839
查看次数

Kafka Streams with Spring Boot

嘿伙计们,我想在我的春季启动项目中使用Kafka Streams实时处理.所以我需要Kafka Streams配置或者我想使用KStreams或KTable,但我在互联网上找不到例子.

我现在做生产者和消费者我想实时流式传输.

apache-kafka spring-boot apache-kafka-streams spring-kafka

8
推荐指数
1
解决办法
1万
查看次数

Kafka:序列化时的消息大于你用max.request.size配置配置的最大请求大小

收到以下错误(Kafka 2.1.0):

2018-12-03 21:22:37.873 错误 37645 --- [nio-8080-exec-1] osksupport.LoggingProducerListener :发送带有 key='null' 和 payload='{82, 73, 70 的消息时抛出异常, 70, 36, 96, 19, 0, 87, 65, 86, 69, 102, 109, 116, 32, 16, 0, 0, 0, 1, 0, 1, 0, 68, -84,.. .' to topic recieved_sound: org.apache.kafka.common.errors.RecordTooLargeException: 序列化时消息为 1269892 字节,大于您使用 max.request.size 配置配置的最大请求大小。

我尝试了各种 SO 帖子中的所有建议。

我的 Producer.properties:

max.request.size=41943040
message.max.bytes=41943040
replica.fetch.max.bytes=41943040
fetch.message.max.bytes=41943040
Run Code Online (Sandbox Code Playgroud)

服务器.属性:

socket.request.max.bytes=104857600
message.max.bytes=41943040
max.request.size=41943040
replica.fetch.max.bytes=41943040
fetch.message.max.bytes=41943040
Run Code Online (Sandbox Code Playgroud)

ProducerConfig(Spring Boot):

configProps.put("message.max.bytes", "41943040");
configProps.put("max.request.size", "41943040");
configProps.put("replica.fetch.max.bytes", "41943040");
configProps.put("fetch.message.max.bytes", "41943040");
Run Code Online (Sandbox Code Playgroud)

消费者配置(SpringBoot):

props.put("fetch.message.max.bytes", "41943040");
props.put("message.max.bytes", "41943040");
props.put("max.request.size", …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

8
推荐指数
1
解决办法
2万
查看次数

Kafka 代理节点因“打开的文件太多”错误而宕机

我们有一个 3 节点的 Kafka 集群部署,总共有 35 个主题,每个主题有 50 个分区。总的来说,我们已经配置了replication factor=2. 我们看到一个非常奇怪的问题,Kafka 节点间歇性地停止响应并显示错误:

ERROR Error while accepting connection (kafka.network.Acceptor)
java.io.IOException: Too many open files
  at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
  at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
  at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
  at kafka.network.Acceptor.accept(SocketServer.scala:460)
  at kafka.network.Acceptor.run(SocketServer.scala:403)
  at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我们已经部署了最新的 Kafka 版本并使用 spring-kafka 作为客户端:

kafka_2.12-2.1.0(CentOS Linux 版本 7.6.1810(核心))

  • 有以下三点观察:
    1. 如果我们这样做lsof -p <kafka_pid>|wc -l,我们得到的打开描述符总数只有 7000 左右。
    2. 如果我们这样做lsof|grep kafka|wc -l,我们将获得大约 150 万个开放 FD。我们已经检查过它们都只属于 Kafka 进程。
    3. 如果我们将系统降级到 Centos6,那么 outlsof|grep kafka|wc -l会回到 7000。

我们已尝试将文件限制设置为非常大,但仍然出现此问题。以下是为 kafka 进程设置的限制:

cat /proc/<kafka_pid>/limits
    Limit …
Run Code Online (Sandbox Code Playgroud)

centos lsof ulimit apache-kafka spring-kafka

8
推荐指数
1
解决办法
7298
查看次数

Spring KafkaListener:如何知道何时准备就绪

我有一个简单的 Spring Boot 应用程序,它从 Kafka 读取并写入 Kafka。我写了一个SpringBootTestusing anEmbeddedKafka来测试所有这些。

主要问题是:有时测试失败是因为测试过早发送Kafka消息。这样,在 Spring 应用程序(或者KafkaListener准确地说)准备就绪之前,消息已经写入 Kafka 。由于侦听器从latest偏移量中读取(我不想为我的测试更改任何配置 - 除了 bootstrap.servers),它不会收到该测试中的所有消息。

有谁知道我如何在测试中知道KafkaListener已准备好接收消息?

我能想到的唯一方法是等到/health可用,但我不知道我是否可以确定这意味着KafkaListener完全准备好了。

任何帮助是极大的赞赏!

此致。

spring apache-kafka spring-boot spring-kafka spring-kafka-test

8
推荐指数
2
解决办法
1451
查看次数

如何编写单元测试用例来为 ListenableFuture 添加回调

我正在尝试为 ListenableFuture 添加回调编写单元测试用例,但我不知道该怎么做。在互联网上没有得到任何有用的东西。

 @Test
    public void can_publish_data_to_kafka() {
        String topic = someString(10);
        String key = someAlphanumericString(5);
        String data = someString(50);
        SendResult sendResult = mock(SendResult.class);
        ListenableFuture<SendResult<String, Object>> future = mock(ListenableFuture.class);

        given(kafkaTemplate.send(topic, key, data)).willReturn(future);

        doAnswer(new Answer() {
            @Override
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                return invocationOnMock.getArguments()[1];
            }
        });

        service.method(key, topic, data);

    }
Run Code Online (Sandbox Code Playgroud)

我想为其编写测试用例的代码

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicName, key, data);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onSuccess(SendResult<String, Object> stringKafkaBeanSendResult) {
                RecordMetadata recordMetadata = stringKafkaBeanSendResult.getRecordMetadata();
                LOGGER.info(String.format("sent message %s to topic %s …
Run Code Online (Sandbox Code Playgroud)

concurrency junit unit-testing mockito spring-kafka

8
推荐指数
1
解决办法
8338
查看次数