Sam*_*her 4 java apache-kafka spring-boot spring-kafka
注意重复标记:我确实检查了其他问题,但它没有回答我下面的具体问题。
想象一下,我在一台只有一个分区的服务器上有一个 Kafka 主题。所以它与队列非常相似。
现在假设我想要 100 个侦听器等待接受队列中的值。因此,根据设计,如果所有 100 个消费者都在一个组中,则日志(或此处的队列)中的内容将在消费者之间分发。所以操作会在1/100的时间内结束。
问题是Spring Kafka监听器只配置了主题名称。
@Service
public class Consumer {
@KafkaListener(topics = "${app.topic}")
public void receive(@Payload String message,
@Headers MessageHeaders headers) {
System.out.println("Received message="+message);
headers.keySet().forEach(key -> System.out.println(key+"->"+headers.get(key)));
}
}
Run Code Online (Sandbox Code Playgroud)
我似乎可以让 Kafka 产生 100 个消费者来处理来自“队列”(日志)的消息。如何做呢?
小智 5
查看此答案以了解 Kafka 消费者在 Apache Kafka 中,为什么消费者实例不能多于分区?
要在单个消费者组中正确分发消息,您必须有多个分区。一旦找到适合负载的正确分区量,我将使用 Spring Cloud Streaming 来更好地管理您的并发和消费者组分配。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
Run Code Online (Sandbox Code Playgroud)
水槽样品
@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(LoggingConsumerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void handle(Person person) {
System.out.println("Received: " + person);
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return this.name;
}
}
}
Run Code Online (Sandbox Code Playgroud)
并发设置
cloud:
stream:
bindings:
input:
destination: <topic-name>
group: <consumer-group>
consumer:
headerMode: raw
partitioned: true
concurrency: 20
Run Code Online (Sandbox Code Playgroud)
更多信息请参见https://cloud.spring.io/spring-cloud-stream/
| 归档时间: |
|
| 查看次数: |
4430 次 |
| 最近记录: |