如何增加消息头

ste*_*man 5 dsl spring-integration

Spring Integration Java DSL 有没有办法修改现有的消息头?

我正在使用 SI Java DSL 重新实现下载重试机制,并且希望在发生失败时增加保存下载尝试的消息标头,然后根据与限制相比的尝试次数路由消息。

我的路由基于 SI 中包含的 RouterTests 运行良好。使用 HeaderEnrichers 我可以轻松添加标头,但我看不到修改现有标头的方法。

谢谢

/**
 * Unit test of {@link RetryRouter}.
 * 
 * Based on {@link RouterTests#testMethodInvokingRouter2()}.
 */
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class RetryRouterTests {

    /** Failed download attempts are sent to this channel to be routed by {@link ContextConfiguration#failedDownloadRouting( ) } */
    @Autowired
    @Qualifier("failed")
    private MessageChannel failed;

    /** Retry attempts for failed downloads are sent to this channel by {@link ContextConfiguration#failedDownloadRouting( ) }*/
    @Autowired
    @Qualifier("retry-channel")
    private PollableChannel retryChannel;

    /** Failed download attempts which will not be retried, are sent to this channel by {@link ContextConfiguration#failedDownloadRouting( ) }*/
    @Autowired
    @Qualifier("exhausted-channel")
    private PollableChannel exhaustedChannel;

    /**
     * Unit test of {@link ContextConfiguration#failedDownloadRouting( ) } and {@link RetryRouter}.
     */
    @Test
    public void retryRouting() {

        final int limit = 2;

        for ( int attempt = 0 ; attempt <= limit + 1 ; attempt++ ){

            this.failed.send( failed( attempt, limit) );

            if ( attempt < limit){

                assertEquals( payload( attempt ) , this.retryChannel.receive( ).getPayload( ) );
                assertNull(this.exhaustedChannel.receive( 0 ) );

            }else{

                assertEquals( payload( attempt ) , this.exhaustedChannel.receive( ).getPayload( ) );
                assertNotNull( this.exhaustedChannel.receive( ).getPayload( ) );
            }
        }

    }

    private Message<String> failed( int retry , int limit ) {

        return MessageBuilder
            .withPayload(  payload( retry ) )
            .setHeader("retries", new AtomicInteger( retry ) )
            .setHeader("limit", limit)
            .build();
    }

    private String payload (int retry){
        return "retry attempt "+retry;
    }


    @Configuration
    @EnableIntegration
    public static class ContextConfiguration {

        @Bean
        public MessageChannel loggerChannel() {
            return MessageChannels.direct().get();
        }

        @Bean(name = "retry-channel")
        public MessageChannel retryChannel() {
            return new QueueChannel();
        }

        @Bean(name = "exhausted-channel")
        public MessageChannel exhaustedChannel() {
            return new QueueChannel();
        }


        /**
         * Decides if a failed download attempt can be retried or not, based upon the number of attempts already made 
         * and the limit to the number of attempts that may be made. Logic is in {@link RetryRouter}.
         * <p>
         * The number of download attempts already made is provided as a header {@link #attempts} from the connector doing the download, 
         * and the limit to the number of attempts is another header {@link #retryLimit} which is originally setup as
         * a header by {@link DownloadDispatcher} from retry configuration.
         * <p>
         * Messages for failed download attempts are listened to on channel {@link #failed}. 
         * Retry attempts are routed to {@link #retryChannel()}
         *  
         * @return
         */
        @Bean
        public IntegrationFlow failedDownloadRouting() {

            return IntegrationFlows.from( "failed" )

                .handle( "headers.retries.getAndIncrement()" )
                .handle( logMessage ( "failed" ) )
                .route(new RetryRouter())
                .get();
        }

        /**
         * Decides if a failed download attempt can be retried or not, based upon the number of attempts already made 
         * and the limit to the number of attempts that may be made. 
         * <p>
         */
        private static class RetryRouter {

            @Router
            public String routeByHeader(@Header("retries") AtomicInteger attempts , @Header("limit") Integer limit) {

                if (attempts.intValue() < limit.intValue()){
                    return "retry-channel";
                }
                return "exhausted-channel";
            }

            /** This method is not used but is required by Spring Integration otherwise application context doesn't load because of
             * {@code Caused by: java.lang.IllegalArgumentException: Target object of type 
             * [class org.springframework.integration.dsl.test.routers.RetryRouterTests$RetryRouter] has no eligible methods for handling Messages.}
             * 
             * @throws UnsupportedOperationException if called
             */
            @SuppressWarnings("unused")
            public String routeMessage(Message<?> message) {

                throw new UnsupportedOperationException( "should not be used." );
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

Art*_*lan 6

有一种方法可以在不修改标题的情况下完成您需要的操作:

.enrichHeaders(h -> h.header("downloadRetries", new AtomicInteger()))
Run Code Online (Sandbox Code Playgroud)

然后,当您需要增加它时,您应该这样做:

.handle(m -> m.getHeaders().get("downloadRetries", AtomicInteger.class).getAndIncrement())
Run Code Online (Sandbox Code Playgroud)

并将此句柄作为重试服务的发布订阅者通道上的第一个单向第一个订阅者。

更新

是一种单向“MessageHandler”,不适合配置“outputChannel”。这是集成流程的结束。

感谢您分享有关此事的配置:现在我遇到了一个问题,而您误解了。解决方案必须是这样的:

        return IntegrationFlows.from( "failed" )

            .publishSubscribeChannel(s -> s
                .subscribe(f -> f
                        .handle(m -> m.getHeaders().get("downloadRetries",
                                  AtomicInteger.class).getAndIncrement()))
            .handle( logMessage ( "failed" ) )
            .route(new RetryRouter())
            .get();
    }
Run Code Online (Sandbox Code Playgroud)

当我们有 a 时PublishSubscribeChannel.subscribe()子流中的 是第一个订阅者的第一个订阅者,.handle( logMessage ( "failed" ) )主流中的 是第二个订阅者。在第一个订阅者的工作完成之前,最后一个订阅者不会被调用。

有关详细信息,请参阅 Spring Integration参考手册Java DSL 手册。