如何从数据库传递@KafkaListener中的groupId值?

Sha*_*har 2 apache-kafka spring-kafka

我想将我的 spring mvc 应用程序与 Kafka 服务器连接以使用 kafka 消息。为此,我编写了 KafkaConsumer 类,如下所示。

@Service
public class KafkaConsumer {
    
    
    @KafkaListener(groupId = "my-group-id", topicPattern = "VID.*", containerFactory = SystemParameterConstants.KAFKA_LISTENER_CONTAINER_FACTORY)
    public void receivedMessage(@Payload String message) {
        logger.info("================ receivedMessage() ==================");
        logger.info("::: Message recieved from kafka ::: {}", message);
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            ...
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这里我有硬编码的组ID “my-group-id”,但我想从数据库读取这个groupId,以便我可以为不同的环境拥有不同的groupId。

请提出一个解决方案。谢谢!

Art*_*lan 5

查看它的JavaDocs:

/**
 * Override the {@code group.id} property for the consumer factory with this value
 * for this listener only.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the group id.
 * @since 1.3
 */
String groupId() default "";
Run Code Online (Sandbox Code Playgroud)

如果你有一个从数据库读取数据的bean,那么你可以这样做:

groupId = "#{myDbBean.grouIdFromDb}"
Run Code Online (Sandbox Code Playgroud)

另请参阅文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-properties