春天卡夫卡听正则表达

krm*_*007 4 java spring spring-kafka

我试图用下面的代码听新创建的主题,但是没有用.如果下面的代码是正确的,你能告诉我吗?

public class KafkaMessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);

    private final ProcessEventModel eventModel;

    @KafkaListener(topicPattern = "betsyncDataTopic*")
    public void receive(ConsumerRecord<String, String> consumerRecord) {
        LOGGER.info("received payload at '{}'", consumerRecord.timestamp());
        eventModel.process(consumerRecord.value());
    }
Run Code Online (Sandbox Code Playgroud)

Gar*_*ell 7

你的正则表达式是无效的; 它应该是betsyncDataTopic.*.

@KafkaListener(id = "xxx", topicPattern = "kbgh.*")
public void listen(String in) {
    System.out.println(in);
}
Run Code Online (Sandbox Code Playgroud)

...

partitions assigned: [kbgh290-0]
Run Code Online (Sandbox Code Playgroud)

编辑

如果您稍后添加与模式匹配的新主题,则重新平衡之前会有延迟.根据KafkaConsumerjavadocs ...

 * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
 * The pattern matching will be done periodically against topic existing at the time of check.
 * <p>
 * As part of group management, the consumer will keep track of the list of consumers that
 * belong to a particular group and will trigger a rebalance operation if one of the
 * following events trigger -
 * <ul>
 * <li>Number of partitions change for any of the subscribed list of topics
 * <li>Topic is created or deleted
 * <li>An existing member of the consumer group dies
 * <li>A new member is added to an existing consumer group via the join API
 * </ul>
Run Code Online (Sandbox Code Playgroud)

我刚做了一个测试; 添加了一个新的匹配主题12:13:32; 结果:

2018-02-12 12:17:30.394  INFO 88028 --- [      xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer    
: partitions revoked: [kbgh290-0]
2018-02-12 12:17:30.450  INFO 88028 --- [      xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer    
: partitions assigned: [kbgh290-0, kbghNew-0]
Run Code Online (Sandbox Code Playgroud)

所以需要几分钟.