我正在使用 spring-Kafka 作为消费者应用程序运行一个 spring 应用程序。在 Kafka 监听器中,我们使用 SpEL 表达式设置了自动启动。我们在运行时更改此值以使此表达式为 false。当我们更改属性时,KafkaListener 将停止消费更多消息。
问题:这是停止轮询进一步记录的正确方法还是我们应该使用 kafkaListenerEndpointRegistry 并停止服务?这两者有什么区别?
另一个问题是,如果我有另一个应用程序使用同一消费者组,所有分区都会重新分配给该应用程序吗?
我正在使用卡夫卡生产者将价格发送到主题。当我发送第一条消息时,它会打印生产者配置,然后发送消息,因此发送第一条消息需要更多时间。
在收到第一条消息后,几乎只用了 1/2 毫秒就发送了一条消息。
我的问题是我们可以做一些事情以便跳过配置部分或者我们可以在发送第一条消息之前开始?
我正在我的项目中使用 spring kafka。我还阅读了其他问题,但并没有太大帮助。
应用程序.yml
server:
port: 8081
spring:
kafka:
bootstrap-servers: ***.***.*.***:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Run Code Online (Sandbox Code Playgroud)
生产者价值观:
acks = 1
batch.size = 16384
bootstrap.servers = [192.168.1.190:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level …Run Code Online (Sandbox Code Playgroud) 我正在使用 Springboot 2.3.5.RELEASE 和 spring-kafka 2.6.3。我正在尝试做一个简单的 Kafka Producer Retry POC,这应该会导致生产者在代理关闭时或者在消息发送到代理之前抛出异常时重试。
以下生产者配置适用于启用重试的幂等生产者。
// Producer configuration
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
props.put(ProducerConfig.RETRIES_CONFIG, "10");
props.put(ProducerConfig.ACKS_CONFIG, "all");
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
... other configs
Run Code Online (Sandbox Code Playgroud)
我知道只有在代理向生产者发送 ack 时出现暂时性错误时,Kafka 生产者重试才会起作用。因此,如果代理在发送消息之前就已关闭,则上述重试不起作用。因此我尝试用 spring-kafka 引入@Retry,不幸的是我也无法让它工作。有谁知道如何解决这个问题,因为这似乎是一个常见的用例,比如如果经纪人宕机或者出现网络故障,我不希望我的制作人停止工作
@Transaction
@Retryable(value = Exception.class, maxAttemptsExpression = "10",
backoff = @Backoff(delayExpression = "1000")) …Run Code Online (Sandbox Code Playgroud) 目前我正在使用 Spring boot 2.4.0 和 spring-kafka。
我想使用消费者和生产者,但不想使用 kafka admin。
我尝试KafkaAdmin通过将“bootstrap.servers”设置为 null 来覆盖类,但不起作用。它仍然“尝试”连接以进行 KafkaAdmin 并记录错误。这不是致命的,但是有什么方法可以完全禁用kafka admin吗?
我正在尝试使用 Kafka Streams 做一个简单的 POC。但是,我在启动应用程序时遇到异常。我正在使用 Spring-Kafka、Kafka-Streams 2.5.1 和 Spring boot 2.3.5 Kafka 流配置
@Configuration
public class KafkaStreamsConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);
@Bean
public Function<KStream<String, String>, KStream<String, String>> processAAA() {
return input -> input.peek((key, value) -> log
.info("AAA Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> processBBB() {
return input -> input.peek((key, value) -> log
.info("BBB Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
@Bean
public Function<KStream<String, String>, KStream<String, …Run Code Online (Sandbox Code Playgroud) apache-kafka spring-cloud-stream apache-kafka-streams spring-kafka spring-cloud-stream-binder-kafka
我有一个使用 Spring Boot、Gradle 和 Kotlin 构建的应用程序,它与 Kafka 和 PostgreSQL 连接。我正在使用它resources/application.yml作为配置文件。\n我想为每种测试创建一个特定的包:单元、集成和端到端,但现在我有一个仅用于单元测试的包。做这个的最好方式是什么?我正在使用 Junit5。
我想要这样的东西:
\napplication/\n\xe2\x94\x9c\xe2\x94\x80 src/\n\xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80 e2e/\n\xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80 integration/\n\xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80 main/\n\xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80 test/\nbuild.gradle.kts\ngradlew\ngradlew.properties\nsettings.gradle.kts\nRun Code Online (Sandbox Code Playgroud)\n另外,我希望在运行单元测试时我的应用程序不要连接到 Kafka 或 Postgres,但在运行集成测试时我希望它这样做。我怎样才能做到这一点?
\n我尝试了 Kafka 的配置,但没有成功:
\napplication/\n\xe2\x94\x9c\xe2\x94\x80 src/\n\xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80 e2e/\n\xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80 integration/\n\xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80 main/\n\xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80 test/\nbuild.gradle.kts\ngradlew\ngradlew.properties\nsettings.gradle.kts\nRun Code Online (Sandbox Code Playgroud)\n我还创建了一个注释来模拟所有 Kafka 消费者和生产者(这有效,但我不想在每个测试类中添加此注释)
\nspring\n autoconfigure:\n exclude: org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration\nRun Code Online (Sandbox Code Playgroud)\n我的消费者看起来像这样:
\n@MockBeans(MockBean(MyConsumer::class), MockBean(MyProducer::class))\nannotation class MockKafka\n\n@SpringBootTest\n@MockKafka\nclass MyAwesomeTest { // tests here }\nRun Code Online (Sandbox Code Playgroud)\n我的 application.yaml 文件如下所示:
\n@Component\nclass MyConsumer() {\n private val logger = …Run Code Online (Sandbox Code Playgroud) 我在 Spring Boot 项目中使用Spring-Kafka 2.7.1 。
当我将其连接到配置了 SSL 的 Kafka Broker 时,它会给出如下所示的“OutofMemory”错误,即使我多次增加堆大小但无济于事。
日志如下:
java.lang.OutOfMemoryError: Java heap space\
at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61) ~[na:na]\
at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348) ~[na:na]\
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226) ~[kafka-clients-2.7.1.jar!/:na]\
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[kafka-clients-2.7.1.jar!/:na]\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1414) ~[spring-kafka-2.7.7.jar!/:2.7.7]\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1251) ~[spring-kafka-2.7.7.jar!/:2.7.7]\
at …Run Code Online (Sandbox Code Playgroud) 我想将我的 spring mvc 应用程序与 Kafka 服务器连接以使用 kafka 消息。为此,我编写了 KafkaConsumer 类,如下所示。
@Service
public class KafkaConsumer {
@KafkaListener(groupId = "my-group-id", topicPattern = "VID.*", containerFactory = SystemParameterConstants.KAFKA_LISTENER_CONTAINER_FACTORY)
public void receivedMessage(@Payload String message) {
logger.info("================ receivedMessage() ==================");
logger.info("::: Message recieved from kafka ::: {}", message);
ObjectMapper objectMapper = new ObjectMapper();
try {
...
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
Run Code Online (Sandbox Code Playgroud)
这里我有硬编码的组ID “my-group-id”,但我想从数据库读取这个groupId,以便我可以为不同的环境拥有不同的groupId。
请提出一个解决方案。谢谢!
我已经实现了一个使用 Spring Kafka 的基本 Spring Boot 应用程序。我希望我的制作人在第一个主题被调用之前连接到 Kafka 主题.send(),但我找不到方法来做到这一点。那可能吗?
日志显示 KafkaTemplate 仅在我触发以下.send方法后连接到 Kafka 主题16:12:44:
2021-11-24 16:12:12.602 INFO 63930 --- [ main] c.e.k.KafkaProducerExampleApplication : The following profiles are active: dev
2021-11-24 16:12:13.551 INFO 63930 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2021-11-24 16:12:13.559 INFO 63930 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2021-11-24 16:12:13.559 INFO 63930 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.53]
2021-11-24 16:12:13.613 INFO 63930 …Run Code Online (Sandbox Code Playgroud) 问题:springboot总是需要创建KafkaTemplate类型的bean吗?下面的详细信息/堆栈跟踪/代码库,请告诉我我做错了什么。谢谢
引起:org.springframework.beans.factory.UnsatisfiedDependencyException:创建在类路径资源[org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]中定义的名称为“kafkaTemplate”的bean时出错:通过方法“kafkaTemplate”表达的依赖关系不满足参数0;嵌套异常是 org.springframework.beans.factory.NoSuchBeanDefinitionException:没有“org.springframework.kafka.core.ProducerFactory”类型的合格 bean 可用:预计至少有 1 个有资格作为自动装配候选者的 bean。依赖注释:{}
在 org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:541) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1380) ~[spring-beans-5.3.12.jar:5.3.12]
在 org.springframework.beans.factory.support.DefaultListableBeanFactory$DependencyObjectProvider.getIfUnique(DefaultListableBeanFactory.java:2063) ~[spring-beans-5.3.12.jar:5.3.12]
在org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration。(KafkaAnnotationDrivenConfiguration.java:90)〜[spring-boot-autoconfigure-2.4.12.jar:2.4.12]
在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(本机方法) ~[na:na]
在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:na]
在 java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:na]
在 java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) ~[na:na]
在 … spring-kafka ×10
apache-kafka ×7
spring-boot ×6
java ×3
gradle ×1
junit5 ×1
kotlin ×1
spring ×1