如何使用Spring Cloud Stream MessageChannels配置@MessagingGateway?

Kei*_*ett 5 spring-integration spring-rabbitmq spring-cloud-stream spring-integration-amqp

我开发了异步Spring Cloud Stream服务,我正在尝试开发一种边缘服务,它使用@MessagingGateway来提供对异步服务的同步访问.

我目前正在获得以下堆栈跟踪:

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:355)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 47 common frames omitted
Run Code Online (Sandbox Code Playgroud)

我的@MessagingGateway:

@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService {
  @Gateway(requestChannel = AccountChannels.CREATE_ACCOUNT_REQUEST,replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
  Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}
Run Code Online (Sandbox Code Playgroud)

如果我通过@StreamListener在回复频道上使用该消息,它可以正常工作:

  @HystrixCommand(commandKey = "acounts-edge:accountCreated", fallbackMethod = "accountCreatedFallback", commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE")}, ignoreExceptions = {ClientException.class})
  @StreamListener(AccountChannels.ACCOUNT_CREATED)
  public void accountCreated(Account account, @Header(name = "spanTraceId", required = false) String traceId) {
    try {
      if (log.isInfoEnabled()) {
        log.info(new StringBuilder("Account created: ").append(objectMapper.writeValueAsString(account)).toString());
      }
    } catch (JsonProcessingException e) {
      log.error(e.getMessage(), e);
    }
  }
Run Code Online (Sandbox Code Playgroud)

在生产者方面,我正在配置requiredGroups以确保多个消费者可以处理消息,相应地,消费者具有匹配的group配置.

消费者:

spring:
  cloud:
    stream:
      bindings:
        create-account-request:
          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          requiredGroups: accounts-service-create-account-request
        account-created:
          binder: rabbit1
          contentType: application/json
          destination: account-created
          group: accounts-edge-account-created
Run Code Online (Sandbox Code Playgroud)

制片人:

spring:
  cloud:
    stream:
      bindings:
        create-account-request:
          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          group: accounts-service-create-account-request
        account-created:
          binder: rabbit1
          contentType: application/json
          destination: account-created
          requiredGroups: accounts-edge-account-created
Run Code Online (Sandbox Code Playgroud)

生产者端的代码位处理请求并发送响应:

  accountChannels.accountCreated().send(MessageBuilder.withPayload(accountService.createAccount(account)).build());
Run Code Online (Sandbox Code Playgroud)

我可以调试并看到请求被接收和处理,但是当响应被发送到回复通道时,就是发生错误的时候.

要使@MessagingGateway正常工作,我缺少哪些配置和/或代码?我知道我正在将Spring Integration和Spring Cloud Gateway结合起来,所以我不确定是否一起使用它们会导致问题.

Art*_*lan 6

这是个好问题,也是个好主意.但它不会那么容易.

首先,我们要确定为自己那gateway意思request/reply,因此correlation.并且这可以在实例的@MessagingGatewayvia replyChannel头中使用TemporaryReplyChannel.即使您有明确的replyChannel = AccountChannels.ACCOUNT_CREATED,也只能通过提到的标头及其值来完成相关.事实上,这TemporaryReplyChannel是不可序列化的,不能通过网络传输给另一方的消费者.

幸运的是,Spring Integration为我们提供了一些解决方案.它是HeaderEnricherheaderChannelsToString背后的一部分HeaderChannelRegistry:

从Spring Integration 3.0开始,可以使用新的子元素; 它没有属性.这会将现有的replyChannel和errorChannel标头(当它们是MessageChannel时)转换为String,并将通道存储在注册表中,以便在发送回复或处理错误时进行解决.这对于标题可能丢失的情况很有用; 例如,将消息序列化到消息存储库或通过JMS传输消息时.如果标头尚不存在,或者它不是MessageChannel,则不进行任何更改.

https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-transformation-chapter.html#header-enricher

但在这种情况下,您必须从网关引入一个内部通道,HeaderEnricher并且只有最后一个通道才会将消息发送给AccountChannels.CREATE_ACCOUNT_REQUEST.因此,replyChannel标题将转换为字符串表示,并能够通过网络传播.在消费者方面,当您发送回复时,您应该确保转移该replyChannel标头,就像它一样.因此,当消息将到达AccountChannels.ACCOUNT_CREATED生产者一侧时,我们有这种情况@MessagingGateway,相关机制能够将信道识别器转换为适当的TemporaryReplyChannel并将回复与等待网关呼叫相关联.

这里只有你的生产者应用程序必须作为组中的单个使用者的问题AccountChannels.ACCOUNT_CREATED- 我们必须确保云中只有一个实例一次运行.仅仅因为只有一个实例TemporaryReplyChannel在其内存中具有该实例.

有关网关的更多信息:https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#gateway

UPDATE

一些帮助代码:

@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService {
  @Gateway(requestChannel = AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST, replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
  Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}

@Bean
public IntegrationFlow headerEnricherFlow() {
   return IntegrationFlows.from(AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST)
            .enrichHeaders(headerEnricher -> headerEnricher.headerChannelsToString())
            .channel(AccountChannels.CREATE_ACCOUNT_REQUEST)
            .get();

}
Run Code Online (Sandbox Code Playgroud)

UPDATE

一些简单的应用程序来演示PoC:

@EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
@SpringBootApplication
public class CloudStreamGatewayApplication {

    interface GatewayChannels {

        String REQUEST = "request";

        @Output(REQUEST)
        MessageChannel request();


        String REPLY = "reply";

        @Input(REPLY)
        SubscribableChannel reply();
    }

    private static final String ENRICH = "enrich";


    @MessagingGateway
    public interface StreamGateway {

        @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
        String process(String payload);

    }

    @Bean
    public IntegrationFlow headerEnricherFlow() {
        return IntegrationFlows.from(ENRICH)
                .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
                .channel(GatewayChannels.REQUEST)
                .get();
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> process(Message<String> request) {
        return MessageBuilder.withPayload(request.getPayload().toUpperCase())
                .copyHeaders(request.getHeaders())
                .build();
    }


    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext =
                SpringApplication.run(CloudStreamGatewayApplication.class, args);

        StreamGateway gateway = applicationContext.getBean(StreamGateway.class);

        String result = gateway.process("foo");

        System.out.println(result);
    }

}
Run Code Online (Sandbox Code Playgroud)

application.yml:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: requests
        output:
          destination: replies
        request:
          destination: requests
        reply:
          destination: replies
Run Code Online (Sandbox Code Playgroud)

我用spring-cloud-starter-stream-rabbit.

MessageBuilder.withPayload(request.getPayload().toUpperCase())
            .copyHeaders(request.getHeaders())
            .build()
Run Code Online (Sandbox Code Playgroud)

技巧是否将请求标头复制到回复消息.因此,网关能够在回复侧将报头中的信道标识符转换为适当的,TemporaryReplyChannel以便将回复正确地传送给网关的呼叫者.

关于此问题的SCSt问题:https://github.com/spring-cloud/spring-cloud-stream/issues/815