使用 Spring Security 和 Rabbitmq 进行 OAuth2 授权

Ste*_*eve 5 java spring spring-rabbit spring-amqp spring-security-oauth2

目前,我们有许多 Spring 微服务与 REST 端点和 RabbitMQ 队列进行通信。我们刚刚在所有服务上实现了 OAuth2 安全性,并且 REST 端点得到了适当的保护。

我们编写了一个库,用于创建 RabbitTemplate 和 AmqpAdmin bean,以便不必在每个服务中都完成样板代码。我们在 Spring 中连接到 RabbitMQ 服务器,其中一个特定用户用于常规客户端,另一个用于管理员。我们不想以个人用户身份连接到 RabbitMQ 服务器。

如果我们在rabbit消息头中传递访问令牌,是否可以将RabbitTemplate配置为在处理消息之前检查令牌?这是可以/应该在模板的 AfterReceive/BeforePublish 处理器中全局完成的事情吗?或者是否需要在每个侦听器方法中单独检查?

谢谢

Ste*_*eve 5

我能够通过创建自定义 MessageListenerContainerFactory 和 MessageListenerContainer 来制定解决方案。

CustomMessageListenerContainerFactory.java:


public class CustomMessageListenerContainerFactory extends AbstractRabbitListenerContainerFactory {

    DefaultTokenServices tokenServices;

    public CustomMessageListenerContainerFactory(DefaultTokenServices tokenServices) {
        this.tokenServices = tokenServices;
    }

    /**
     * Create an empty container instance.
     *
     * @return the new container instance.
     */
    @Override
    protected CustomMessageListenerContainer createContainerInstance() {
        return new CustomMessageListenerContainer(tokenServices);
    }
}
Run Code Online (Sandbox Code Playgroud)

CustomMessageListenerContainer.java:


public class CustomMessageListenerContainer extends SimpleMessageListenerContainer {
    private final static String errorMessage = "No valid credentials found in request: {}";
    private final static String handlingMessage = "Handling queue: {}";
    private final static String receivedMessage = "Received Message: {}";
    private final static Logger logger = LoggerFactory.getLogger(CustomMessageListenerContainer.class);


    private DefaultTokenServices tokenServices;

    /**
     * Constructor
     *
     * @param tokenServices The instance of DefaultTokenServices used to decrypt the access token.
     */
    public CustomMessageListenerContainer(DefaultTokenServices tokenServices) {
        this.tokenServices = tokenServices;
    }

    /**
     * This method checks to see if there is a valid authorization
     *
     * @param channel   The AMQP channel on which the message was published.
     * @param messageIn The incoming message.
     * @throws Exception Throws an exception when there are no valid credentials in the message.
     */
    @Override
    protected void executeListener(Channel channel, Message messageIn) throws Exception {
        logger.info(handlingMessage, (Object[]) getQueueNames());
        logger.info(receivedMessage, BeanUtils.beanProperties(messageIn));
        if (messageIn.getMessageProperties().getHeaders().keySet().stream().anyMatch(t -> Objects.equals(t.toLowerCase(), "authorization"))) {
            String accessKey = messageIn.getMessageProperties()
                    .getHeaders()
                    .keySet()
                    .stream()
                    .filter(t -> Objects.equals(t.toLowerCase(), "authorization"))
                    .findFirst()
                    .get();
            OAuth2Authentication auth = tokenServices.loadAuthentication(messageIn.getMessageProperties().getHeaders().get(accessKey).toString());
            // If the token is expired, there will be no auth.
            if (auth != null) {
                SecurityContextHolder.getContext().setAuthentication(auth);
                super.executeListener(channel, messageIn);
                return;
            }
        }
        rejectMessage(channel, messageIn);
    }

    private void rejectMessage(Channel channel, Message messageIn) throws Exception {
        logger.info(errorMessage, (Object[]) getQueueNames());
        String localMessage = errorMessage.replace("{}", String.join(", ", getQueueNames()));
        if (messageIn.getMessageProperties().getReplyTo() != null) {
            channel.basicPublish("",
                    messageIn.getMessageProperties().getReplyTo(),
                    new AMQP.BasicProperties.Builder()
                            .contentType("application/json")
                            .correlationId(messageIn.getMessageProperties().getCorrelationId())
                            .build(),
                    "{\"errorMessage\":\"".concat(localMessage).concat("\"}").getBytes());
        }
        throw new AmqpRejectAndDontRequeueException(localMessage);
    }
}
Run Code Online (Sandbox Code Playgroud)

CustomRabbitListenerConfigurer.java:

...
@Override
    public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
        CustomMessageListenerContainerfactory factory = new CustomMessageListenerContainerfactory(tokenServices);
        ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
        factory.setConnectionFactory(connectionFactory);
        registrar.setContainerFactory(factory);
    }
...
Run Code Online (Sandbox Code Playgroud)