spring 云流-消费群绑定

jog*_*dra 1 spring-boot spring-cloud kafka-consumer-api spring-cloud-stream spring-kafka

我的消费者绑定到匿名消费者组,而不是我指定的消费者组。

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost
          defaultBrokerPort: 9092
          zkNodes: localhost
          defaultZkPort: 2181
        bindings:
          inEvent:
            group: eventin
            destination: event
          outEvent:
            group: eventout
            destination: processevent
Run Code Online (Sandbox Code Playgroud)

我的 Spring Boot 应用程序

@SpringBootApplication
@EnableBinding(EventStream.class)
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
    @StreamListener(value = "inEvent")
    public void getEvent(Event event){
        System.out.println(event.name);
    }
}
Run Code Online (Sandbox Code Playgroud)

我的输入输出通道接口

public interface EventStream {
    @Input("inEvent")
    SubscribableChannel inEvent();
    @Output("outEvent")
    MessageChannel outEvent();
}
Run Code Online (Sandbox Code Playgroud)

我的控制台日志--

:在 3.233 秒内启动 ConsumerApplication(JVM 运行 4.004):[ Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d]发现组协调员 singh:9092 (id: 2147483647 rack: null) : [Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 撤销: 先前分配的分区 [重新分配分区] [消费者clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d](重新)加入群组:[消费者clientId=consumer-3,groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] ] 成功加入第 1 代组:[消费者 clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 设置新分配的分区 [inEvent-0] : [消费者 clientId=consumer-3, groupId=匿名.0d0c87d6-ef39-4bfe-b475-4491c40caf6d] 将分区 inEvent-0 的偏移量重置为偏移量 2。:分配的分区:[inEvent-0]

Art*_*lan 5

group属性不能在kafka树中。它必须是这样的:

我的消费者绑定到匿名消费者组,而不是我指定的消费者组。

spring:
  cloud:
    stream:
       bindings:
          inEvent:
            group: eventin
            destination: event
Run Code Online (Sandbox Code Playgroud)

在文档中查看更多信息:http : //cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.1.RELEASE/single/spring-cloud-stream.html#consumer-groups

group是共同财产,所以它是同独立的粘结剂实施。的kafka是Apache卡夫卡特定属性,暴露在其粘合剂执行层面。