目前我有这样的代码:
KafkaTemplate<String, String> kafkaTemplate;
List<Pet> myData;
for(Pet p: myData) {
String json = objectWriter.writeValueAsString(p)
kafkaTemplate.send(topic, json)
}
Run Code Online (Sandbox Code Playgroud)
所以每个列表项都是一一发送的。如何一次发送整个列表?
您好,升级到 Spring Boot 3 后,我遇到了 swagger ui 问题。swagger-ui 不再工作,我得到了 404 和“白色标签”页面作为响应。
仔细查看后,我需要更改:implementation(group: 'org.springdoc', name: 'springdoc-openapi-ui', version: '1.5.8')
至:实现(组:'org.springdoc',名称:'springdoc-openapi-starter-webmvc-ui',版本:'2.0.3')
之后它确实起作用了,但是在我的其他项目之一中,我们遇到了依赖冲突的问题,因此 swagger-ui 在尝试获取 /v3/api-docs/: 之间抛出 500: io.confluence :kafka-avro-serializer:7.3.1 和 springdoc-openapi-starter-webmvc-ui:2.0.3 这两个依赖项需要使用 io.swagger.core.v3:swagger-annotations-jakarta,但版本不同
因此,在 gradle 中,我必须解决冲突以强制 io.swagger.core.v3:swagger-annotations-jakarta:2.2.8。
我是Spring和Kafka的新手.我正在研究一个用例[使用SpringBoot-kafka],允许用户在运行时创建kafka主题.spring应用程序应该在运行时以编程方式订阅这些主题.到目前为止我所知道的是,Kafka听众是设计时间,因此需要在启动前指定主题.有没有办法动态订阅SpringBoot-Kafka集成中的kafka主题?
推荐这个 https://github.com/spring-projects/spring-kafka/issues/132
我计划实现的当前方法是,不要使用Spring-Kafka集成而不是自己实现Kafka消费者[使用java代码]如此处提到的 spring boot kafka consumer - 如何正确地使用来自spring boot的kafka消息
spring spring-mvc spring-boot kafka-consumer-api spring-kafka
我可以请与社区核实听取多个主题的最佳方式是什么,每个主题包含不同类的消息?
在过去的几天里,我一直在玩Spring Kafka.到目前为止我的思考过程:
因为在初始化KafkaListenerContainerFactory时需要将反序列化器传递给DefaultKafkaConsumerFactory.这似乎表明,如果我需要多个容器,每个反序列化一个不同类型的消息,我将无法使用@EnableKafka和@KafkaListener注释.
这使我认为这样做的唯一方法是实例化多个KafkaMessageListenerContainer.
鉴于KafkaMessageListenerContainers是单线程的,我需要同时监听多个主题,我真的应该使用多个ConcurrentKafkaMessageListenerContainers.
我会在这里走上正轨吗?有一个更好的方法吗?
谢谢!
我有一个可能需要多个生产者的应用程序。我看到的所有代码示例似乎都支持单个生产者,在应用程序启动期间从应用程序读取配置。如果有多个生产者并且我们想传入不同的生产者配置,那么 Spring 是否有开箱即用的支持?或者在这种情况下我应该没有弹簧吗?
嘿伙计们,我想在我的春季启动项目中使用Kafka Streams实时处理.所以我需要Kafka Streams配置或者我想使用KStreams或KTable,但我在互联网上找不到例子.
我现在做生产者和消费者我想实时流式传输.
收到以下错误(Kafka 2.1.0):
2018-12-03 21:22:37.873 错误 37645 --- [nio-8080-exec-1] osksupport.LoggingProducerListener :发送带有 key='null' 和 payload='{82, 73, 70 的消息时抛出异常, 70, 36, 96, 19, 0, 87, 65, 86, 69, 102, 109, 116, 32, 16, 0, 0, 0, 1, 0, 1, 0, 68, -84,.. .' to topic recieved_sound: org.apache.kafka.common.errors.RecordTooLargeException: 序列化时消息为 1269892 字节,大于您使用 max.request.size 配置配置的最大请求大小。
我尝试了各种 SO 帖子中的所有建议。
我的 Producer.properties:
max.request.size=41943040
message.max.bytes=41943040
replica.fetch.max.bytes=41943040
fetch.message.max.bytes=41943040
Run Code Online (Sandbox Code Playgroud)
服务器.属性:
socket.request.max.bytes=104857600
message.max.bytes=41943040
max.request.size=41943040
replica.fetch.max.bytes=41943040
fetch.message.max.bytes=41943040
Run Code Online (Sandbox Code Playgroud)
ProducerConfig(Spring Boot):
configProps.put("message.max.bytes", "41943040");
configProps.put("max.request.size", "41943040");
configProps.put("replica.fetch.max.bytes", "41943040");
configProps.put("fetch.message.max.bytes", "41943040");
Run Code Online (Sandbox Code Playgroud)
消费者配置(SpringBoot):
props.put("fetch.message.max.bytes", "41943040");
props.put("message.max.bytes", "41943040");
props.put("max.request.size", …
Run Code Online (Sandbox Code Playgroud) apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
我们有一个 3 节点的 Kafka 集群部署,总共有 35 个主题,每个主题有 50 个分区。总的来说,我们已经配置了replication factor=2
. 我们看到一个非常奇怪的问题,Kafka 节点间歇性地停止响应并显示错误:
ERROR Error while accepting connection (kafka.network.Acceptor)
java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at kafka.network.Acceptor.accept(SocketServer.scala:460)
at kafka.network.Acceptor.run(SocketServer.scala:403)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我们已经部署了最新的 Kafka 版本并使用 spring-kafka 作为客户端:
kafka_2.12-2.1.0(CentOS Linux 版本 7.6.1810(核心))
lsof -p <kafka_pid>|wc -l
,我们得到的打开描述符总数只有 7000 左右。lsof|grep kafka|wc -l
,我们将获得大约 150 万个开放 FD。我们已经检查过它们都只属于 Kafka 进程。lsof|grep kafka|wc -l
会回到 7000。我们已尝试将文件限制设置为非常大,但仍然出现此问题。以下是为 kafka 进程设置的限制:
cat /proc/<kafka_pid>/limits
Limit …
Run Code Online (Sandbox Code Playgroud) 我有一个简单的 Spring Boot 应用程序,它从 Kafka 读取并写入 Kafka。我写了一个SpringBootTest
using anEmbeddedKafka
来测试所有这些。
主要问题是:有时测试失败是因为测试过早发送Kafka消息。这样,在 Spring 应用程序(或者KafkaListener
准确地说)准备就绪之前,消息已经写入 Kafka 。由于侦听器从latest
偏移量中读取(我不想为我的测试更改任何配置 - 除了 bootstrap.servers),它不会收到该测试中的所有消息。
有谁知道我如何在测试中知道KafkaListener
已准备好接收消息?
我能想到的唯一方法是等到/health
可用,但我不知道我是否可以确定这意味着KafkaListener
完全准备好了。
任何帮助是极大的赞赏!
此致。
spring apache-kafka spring-boot spring-kafka spring-kafka-test
我正在尝试为 ListenableFuture 添加回调编写单元测试用例,但我不知道该怎么做。在互联网上没有得到任何有用的东西。
@Test
public void can_publish_data_to_kafka() {
String topic = someString(10);
String key = someAlphanumericString(5);
String data = someString(50);
SendResult sendResult = mock(SendResult.class);
ListenableFuture<SendResult<String, Object>> future = mock(ListenableFuture.class);
given(kafkaTemplate.send(topic, key, data)).willReturn(future);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
return invocationOnMock.getArguments()[1];
}
});
service.method(key, topic, data);
}
Run Code Online (Sandbox Code Playgroud)
我想为其编写测试用例的代码
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicName, key, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(SendResult<String, Object> stringKafkaBeanSendResult) {
RecordMetadata recordMetadata = stringKafkaBeanSendResult.getRecordMetadata();
LOGGER.info(String.format("sent message %s to topic %s …
Run Code Online (Sandbox Code Playgroud) spring-kafka ×10
apache-kafka ×7
spring ×4
spring-boot ×4
java ×3
centos ×1
concurrency ×1
junit ×1
lsof ×1
mockito ×1
spring-mvc ×1
swagger-ui ×1
ulimit ×1
unit-testing ×1