jog*_*dra 1 spring-boot spring-cloud kafka-consumer-api spring-cloud-stream spring-kafka
我的消费者绑定到匿名消费者组,而不是我指定的消费者组。
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
zkNodes: localhost
defaultZkPort: 2181
bindings:
inEvent:
group: eventin
destination: event
outEvent:
group: eventout
destination: processevent
Run Code Online (Sandbox Code Playgroud)
我的 Spring Boot 应用程序
@SpringBootApplication
@EnableBinding(EventStream.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(value = "inEvent")
public void getEvent(Event event){
System.out.println(event.name);
}
}
Run Code Online (Sandbox Code Playgroud)
我的输入输出通道接口
public interface EventStream {
@Input("inEvent")
SubscribableChannel inEvent();
@Output("outEvent")
MessageChannel outEvent();
}
Run Code Online (Sandbox Code Playgroud)
我的控制台日志--
:在 3.233 秒内启动 ConsumerApplication(JVM 运行 4.004):[ Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d]发现组协调员 singh:9092 (id: 2147483647 rack: null) : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 撤销: 先前分配的分区 [重新分配分区] [消费者clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d](重新)加入群组:[消费者clientId=consumer-3,groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] ] 成功加入第 1 代组:[消费者 clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 设置新分配的分区 [inEvent-0] : [消费者 clientId=consumer-3, groupId=匿名.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 将分区 inEvent-0 的偏移量重置为偏移量 2。:分配的分区:[inEvent-0]
该group属性不能在kafka树中。它必须是这样的:
我的消费者绑定到匿名消费者组,而不是我指定的消费者组。
spring:
cloud:
stream:
bindings:
inEvent:
group: eventin
destination: event
Run Code Online (Sandbox Code Playgroud)
在文档中查看更多信息:http : //cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.1.RELEASE/single/spring-cloud-stream.html#consumer-groups
该group是共同财产,所以它是同独立的粘结剂实施。的kafka是Apache卡夫卡特定属性,暴露在其粘合剂执行层面。
| 归档时间: |
|
| 查看次数: |
2759 次 |
| 最近记录: |