bil*_*oor 2 apache-kafka spring-cloud spring-cloud-stream
我正在查看一个 Spring Boot 服务,它从 apache kafka 读取消息,通过 http 从另一个服务请求消息指示的记录,处理它们,将一些数据保存到数据库中,并将结果发布到另一个主题。
这是通过
@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)
Run Code Online (Sandbox Code Playgroud)
这是在几个服务中完成的,通常工作得很好。唯一的属性集是
spring.cloud.stream.binder.consumer.concurrency=20
Run Code Online (Sandbox Code Playgroud)
主题本身有 20 个分区,应该适合。
在监控 kafka 的读取时,我们发现吞吐量非常低和奇怪的行为:
该应用程序一次最多读取 500 条消息,然后是 1-2 分钟的无内容。在此期间,消费者反复记录“缺少心跳,因为分区被重新平衡”,“重新分配分区”,有时甚至抛出异常说“提交失败,因为轮询间隔已过”
我们得出的结论是,这意味着消费者获取 500 条消息,需要很长时间来处理所有消息,错过了它的时间窗口,因此无法将 500 条消息中的任何一条提交给代理——代理重新分配分区并重新发送相同的消息再次。
在查看线程和文档后,我发现了“max.poll.records”属性,但在设置此属性的位置的建议中存在冲突。
有人说把它放在下面
spring.cloud.stream.bindings.consumer.<input>.configuration
Run Code Online (Sandbox Code Playgroud)
有人说
spring.cloud.stream.kafka.binders.consumer-properties
Run Code Online (Sandbox Code Playgroud)
我尝试将两者都设置为 1,但服务行为没有改变。
我如何正确处理这种情况,即消费者无法跟上默认设置所需的轮询间隔?
常见的yaml:
spring.cloud.stream.default.group=${spring.application.name}
Run Code Online (Sandbox Code Playgroud)
服务-yaml
spring:
clould:
stream:
default:
consumer.headerMode: embeddedHeaders
producer.headerMode: embeddedHeaders
bindings:
someOutput:
destination: outTopic
someInput:
destination: inTopic
consumer:
concurrency: 30
kafka:
bindings:
consumer:
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
binder:
consumer-properties:
max.poll.records: 10 # this gets used first
configuration:
max.poll.records: 40 # this get used when the first one is not present
Run Code Online (Sandbox Code Playgroud)
“忽略这个”总是意味着,如果没有设置其他属性,ConsumerConfiguration 将其默认设置为 500,用于最大轮询记录
编辑:我们已经接近了:
问题与设置了 indexBackoffStrategy 的 spring 重试有关 - 以及一系列有效地停止应用程序的错误。
我不明白的是,我们通过向相关主题发布格式错误的消息来强制出现 200 个错误,这导致应用读取 200,花费很长时间(使用旧的重试配置),然后一次提交所有 200 个错误。
如果我们有
max.poll.records: 1
concurrency: 1
ackEachRecod = true
enableDlq: true # (which implicitly makes autoCommitOffsets = true according to the docs)
Run Code Online (Sandbox Code Playgroud)
它是
spring.cloud.stream.kafka.bindings.consumer.<input>.consumer.configuration.max.poll.records
.
Run Code Online (Sandbox Code Playgroud)
看文档...
Kafka 消费者属性
以下属性仅适用于 Kafka 消费者,并且必须以
spring.cloud.stream.kafka.bindings.<channelName>.consumer.
...
配置
使用包含通用 Kafka 消费者属性的键/值对进行映射。
默认值:空地图。
...
你也可以增加max.poll.interval.ms
.
编辑
我刚刚用 2.1.0.RELEASE 进行了测试 - 它按我的描述工作:
没有设置
2019-03-01 08:47:59.560 INFO 44698 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 500
...
Run Code Online (Sandbox Code Playgroud)
开机默认
spring.kafka.consumer.properties.max.poll.records=42
2019-03-01 08:49:49.197 INFO 45044 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 42
...
Run Code Online (Sandbox Code Playgroud)
活页夹默认 #1
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.consumer-properties.max.poll.records=43
2019-03-01 08:52:11.469 INFO 45842 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 43
...
Run Code Online (Sandbox Code Playgroud)
活页夹默认值 #2
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
2019-03-01 08:54:06.211 INFO 46252 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 43
...
Run Code Online (Sandbox Code Playgroud)
绑定默认
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
2019-03-01 09:02:26.004 INFO 47833 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 44
...
Run Code Online (Sandbox Code Playgroud)
绑定特定
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45
2019-03-01 09:05:01.452 INFO 48330 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 45
...
Run Code Online (Sandbox Code Playgroud)
编辑2
这是完整的测试应用程序。我只是在http://start.spring.io 上创建了一个新应用程序并选择了“Kafka”和“Cloud Stream”。
@SpringBootApplication
@EnableBinding(Sink.class)
public class So54932453Application {
public static void main(String[] args) {
SpringApplication.run(So54932453Application.class, args).close();
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
}
}
Run Code Online (Sandbox Code Playgroud)
和
spring.cloud.stream.bindings.input.group=so54932453
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45
Run Code Online (Sandbox Code Playgroud)
和
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>net.gprussell</groupId>
<artifactId>so54932453</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>so54932453</name>
<description>Demo</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
</project>
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
3652 次 |
最近记录: |