Kei*_*ett 5 spring-integration spring-rabbitmq spring-cloud-stream spring-integration-amqp
我开发了异步Spring Cloud Stream服务,我正在尝试开发一种边缘服务,它使用@MessagingGateway来提供对异步服务的同步访问.
我目前正在获得以下堆栈跟踪:
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:355)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 47 common frames omitted
Run Code Online (Sandbox Code Playgroud)
我的@MessagingGateway:
@EnableBinding(AccountChannels.class)
@MessagingGateway
public interface AccountService {
@Gateway(requestChannel = AccountChannels.CREATE_ACCOUNT_REQUEST,replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}
Run Code Online (Sandbox Code Playgroud)
如果我通过@StreamListener在回复频道上使用该消息,它可以正常工作:
@HystrixCommand(commandKey = "acounts-edge:accountCreated", fallbackMethod = "accountCreatedFallback", commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE")}, ignoreExceptions = {ClientException.class})
@StreamListener(AccountChannels.ACCOUNT_CREATED)
public void accountCreated(Account account, @Header(name = "spanTraceId", required = false) String traceId) {
try {
if (log.isInfoEnabled()) {
log.info(new StringBuilder("Account created: ").append(objectMapper.writeValueAsString(account)).toString());
}
} catch (JsonProcessingException e) {
log.error(e.getMessage(), e);
}
}
Run Code Online (Sandbox Code Playgroud)
在生产者方面,我正在配置requiredGroups
以确保多个消费者可以处理消息,相应地,消费者具有匹配的group
配置.
消费者:
spring:
cloud:
stream:
bindings:
create-account-request:
binder: rabbit1
contentType: application/json
destination: create-account-request
requiredGroups: accounts-service-create-account-request
account-created:
binder: rabbit1
contentType: application/json
destination: account-created
group: accounts-edge-account-created
Run Code Online (Sandbox Code Playgroud)
制片人:
spring:
cloud:
stream:
bindings:
create-account-request:
binder: rabbit1
contentType: application/json
destination: create-account-request
group: accounts-service-create-account-request
account-created:
binder: rabbit1
contentType: application/json
destination: account-created
requiredGroups: accounts-edge-account-created
Run Code Online (Sandbox Code Playgroud)
生产者端的代码位处理请求并发送响应:
accountChannels.accountCreated().send(MessageBuilder.withPayload(accountService.createAccount(account)).build());
Run Code Online (Sandbox Code Playgroud)
我可以调试并看到请求被接收和处理,但是当响应被发送到回复通道时,就是发生错误的时候.
要使@MessagingGateway正常工作,我缺少哪些配置和/或代码?我知道我正在将Spring Integration和Spring Cloud Gateway结合起来,所以我不确定是否一起使用它们会导致问题.
这是个好问题,也是个好主意.但它不会那么容易.
首先,我们要确定为自己那gateway
意思request/reply
,因此correlation
.并且这可以在实例的@MessagingGateway
via replyChannel
头中使用TemporaryReplyChannel
.即使您有明确的replyChannel = AccountChannels.ACCOUNT_CREATED
,也只能通过提到的标头及其值来完成相关.事实上,这TemporaryReplyChannel
是不可序列化的,不能通过网络传输给另一方的消费者.
幸运的是,Spring Integration为我们提供了一些解决方案.它是HeaderEnricher
其headerChannelsToString
背后的一部分HeaderChannelRegistry
:
从Spring Integration 3.0开始,可以使用新的子元素; 它没有属性.这会将现有的replyChannel和errorChannel标头(当它们是MessageChannel时)转换为String,并将通道存储在注册表中,以便在发送回复或处理错误时进行解决.这对于标题可能丢失的情况很有用; 例如,将消息序列化到消息存储库或通过JMS传输消息时.如果标头尚不存在,或者它不是MessageChannel,则不进行任何更改.
但在这种情况下,您必须从网关引入一个内部通道,HeaderEnricher
并且只有最后一个通道才会将消息发送给AccountChannels.CREATE_ACCOUNT_REQUEST
.因此,replyChannel
标题将转换为字符串表示,并能够通过网络传播.在消费者方面,当您发送回复时,您应该确保转移该replyChannel
标头,就像它一样.因此,当消息将到达AccountChannels.ACCOUNT_CREATED
生产者一侧时,我们有这种情况@MessagingGateway
,相关机制能够将信道识别器转换为适当的TemporaryReplyChannel
并将回复与等待网关呼叫相关联.
这里只有你的生产者应用程序必须作为组中的单个使用者的问题AccountChannels.ACCOUNT_CREATED
- 我们必须确保云中只有一个实例一次运行.仅仅因为只有一个实例TemporaryReplyChannel
在其内存中具有该实例.
有关网关的更多信息:https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#gateway
UPDATE
一些帮助代码:
@EnableBinding(AccountChannels.class)
@MessagingGateway
public interface AccountService {
@Gateway(requestChannel = AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST, replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST)
.enrichHeaders(headerEnricher -> headerEnricher.headerChannelsToString())
.channel(AccountChannels.CREATE_ACCOUNT_REQUEST)
.get();
}
Run Code Online (Sandbox Code Playgroud)
UPDATE
一些简单的应用程序来演示PoC:
@EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
@SpringBootApplication
public class CloudStreamGatewayApplication {
interface GatewayChannels {
String REQUEST = "request";
@Output(REQUEST)
MessageChannel request();
String REPLY = "reply";
@Input(REPLY)
SubscribableChannel reply();
}
private static final String ENRICH = "enrich";
@MessagingGateway
public interface StreamGateway {
@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
String process(String payload);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(ENRICH)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel(GatewayChannels.REQUEST)
.get();
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> process(Message<String> request) {
return MessageBuilder.withPayload(request.getPayload().toUpperCase())
.copyHeaders(request.getHeaders())
.build();
}
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext =
SpringApplication.run(CloudStreamGatewayApplication.class, args);
StreamGateway gateway = applicationContext.getBean(StreamGateway.class);
String result = gateway.process("foo");
System.out.println(result);
}
}
Run Code Online (Sandbox Code Playgroud)
的application.yml
:
spring:
cloud:
stream:
bindings:
input:
destination: requests
output:
destination: replies
request:
destination: requests
reply:
destination: replies
Run Code Online (Sandbox Code Playgroud)
我用spring-cloud-starter-stream-rabbit
.
该
MessageBuilder.withPayload(request.getPayload().toUpperCase())
.copyHeaders(request.getHeaders())
.build()
Run Code Online (Sandbox Code Playgroud)
技巧是否将请求标头复制到回复消息.因此,网关能够在回复侧将报头中的信道标识符转换为适当的,TemporaryReplyChannel
以便将回复正确地传送给网关的呼叫者.
关于此问题的SCSt问题:https://github.com/spring-cloud/spring-cloud-stream/issues/815
归档时间: |
|
查看次数: |
2400 次 |
最近记录: |