我正在使用带有RabbitMQ活页夹的Spring Cloud Stream.它适用于byte[]有效负载和Java本机序列化,但我需要使用JSON有效负载.
这是我的处理器类.
@EnableBinding(Processor.class)
public class MessageProcessor {
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public OutputDto handleIncomingMessage(InputDto inputDto) {
// Run some job.
return new OutputDto();
}
}
Run Code Online (Sandbox Code Playgroud)
InputDto并且OutputDto是杰克逊注释的POJO.
是否有一个应该与spring-cloud-stream一起使用的请求-答复模式?我在spring-cloud-stream上可以找到的所有文档都针对MessageChannel.send即发即弃类型的生产者,我对spring-integration的@MessagingGateway很熟悉,但是我不确定这会如何与spring-cloud-stream合作。当您具有一个REST POST端点,该端点使用分配的标识符保存实体并且您需要将分配的标识符返回给调用者,但仍要使用消息传递时,这将很有用。
我正在尝试实施Turbine AMQP来整合从多个服务到Hystrix Dashboard的所有流.
所以我在gradle文件中添加了几个依赖项,之后由于某种原因我无法启动我的应用程序.
来自启动的LOGS,我看到异常.
[LogMessage=Application startup failed]
org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'bindingService' defined in class path resource [org/springframework/cloud/stream/config/ChannelBindingServiceConfiguration.class]: Unsatisfied dependency expressed through constructor argument with index 1 of type [org.springframework.cloud.stream.binder.BinderFactory]: Error creating bean with name 'binderFactory' defined in class path resource [org/springframework/cloud/stream/config/BinderFactoryConfiguration.class]: Unsatisfied dependency expressed through constructor argument with index 0 of type [org.springframework.cloud.stream.binder.BinderTypeRegistry]: Error creating bean with name 'binderTypeRegistry' defined in class path resource [org/springframework/cloud/stream/config/BinderFactoryConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed …Run Code Online (Sandbox Code Playgroud) turbine spring-boot hystrix spring-cloud spring-cloud-stream
基于我所看到的,在Spring Cloud Dataflow(SCDF)中创建流将部署底层应用程序,绑定通信服务(如RabbitMQ),设置Spring Cloud Stream环境变量,并启动应用程序.这可以使用cf push命令轻松完成.
与此同时,我一直遇到Spring Cloud Dataflow的一些缺点:
那我错过了什么?为什么使用Spring Cloud Dataflow只对手动部署应用程序有益?
我正在使用Spring Cloud Stream,并想以编程方式创建和绑定频道。我的用例是,在应用程序启动期间,我会收到要订阅的Kafka主题的动态列表。然后如何为每个主题创建频道?
很抱歉这个问题过于通用,但有人有一些关于如何使用kafka嵌入式执行生产者和消费者测试的教程或指南.我已经尝试了几个,但有几个版本的依赖项,没有实际工作= /
我正在使用spring cloud stream kafka.
我试图理解为什么我想在 RabbitMQ 中使用 Spring 云流。我看过 RabbitMQ Spring 教程 4 ( https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html ),这基本上就是我想要做的。它创建了一个带有 2 个附加队列的直接交换,并根据路由键将消息路由到 Q1 或 Q2。
如果您查看教程,则整个过程非常简单,您创建所有部件,将它们绑定在一起,然后就可以开始了。
我想知道使用 Sing Cloud Stream 有什么好处,如果这就是它的用例。创建一个简单的交换很容易,甚至可以通过流直接定义目的地和组。所以我想为什么不更进一步并尝试使用流处理教程案例。
我已经看到 Stream 有一个BinderAwareChannelResolver似乎做同样的事情。但是我正在努力将它们放在一起以实现与 RabbitMQ Spring 教程中相同的效果。我不确定这是否是一个依赖问题,但我似乎在这里从根本上误解了一些东西,我认为是这样的:
spring.cloud.stream.bindings.output.destination=myDestination
spring.cloud.stream.bindings.output.group=consumerGroup
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key'
Run Code Online (Sandbox Code Playgroud)
应该诀窍。
有没有人有一个源和接收器的最小示例,它基本上创建了一个直接交换,将 2 个队列绑定到它,并根据路由关键路由到这两个队列之一,如https://www.rabbitmq.com/tutorials /tutorial-four-spring-amqp.html?
编辑:
下面是一组最小的代码,它演示了如何执行我所要求的操作。我没有附上,build.gradle因为它很简单(但如果有人感兴趣,请告诉我)
application.properties: 设置生产者
spring.cloud.stream.bindings.output.destination=tut.direct
spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type
Run Code Online (Sandbox Code Playgroud)
Sources.class: 设置制作人频道
public interface Sources {
String OUTPUT = "output";
@Output(Sources.OUTPUT)
MessageChannel output();
}
Run Code Online (Sandbox Code Playgroud)
StatusController.class:响应休息呼叫并使用特定的路由键发送消息
/**
* Status endpoint for the health-check service.
*/
@RestController
@EnableBinding(Sources.class) …Run Code Online (Sandbox Code Playgroud) 如何使用标准的基于Spring注释的验证在消息侦听器中使用Spring Cloud Stream框架执行验证?
尝试了不同的情况,@Valid @Payload对于传入的对象,尝试使用@Validatedon entity 处理器的方法验证,但它没有帮助.
@StreamListener(MediaItemStream.ITEM_LIKED_CHANNEL)
public void handleLikeMessage(@Valid @Payload LikeInputDto like) {...
Run Code Online (Sandbox Code Playgroud)
和
@Bean
public MethodValidationPostProcessor methodValidationPostProcessor() {
return new MethodValidationPostProcessor();
}
Run Code Online (Sandbox Code Playgroud)
目前最好的方法是使用自定义服务进行验证,但它看起来并不像想要的那样.
@Log4j2
@Service
@AllArgsConstructor
public class LikeStreamHandler {
private MediaEventMessagingService mediaEventMessagingService;
private ValidationService validationService;
@StreamListener(MediaItemStream.ITEM_LIKED_CHANNEL)
public void handleLikeMessage(LikeInputDto like) {
validationService.validate(like);
log.debug("Handling LIKE message: {}", like);
mediaEventMessagingService.processLikeEvent(like);
}
}
Run Code Online (Sandbox Code Playgroud) 是否有任何建议的方法可以正常关闭Kubernetes中的Spring:boot 2应用程序。
我正在尝试正常关闭Rest应用程序和SCS(kafka消费者与生产者)应用程序
spring-boot kubernetes spring-boot-actuator spring-cloud-stream
我有一个使用 Kafka binder 的 Spring Cloud Stream 项目,我正在尝试理解并最终自定义 Cloud Stream 使用的 RetryTemplate。
我没有找到很多关于它是如何工作的文档,但是我所阅读的内容使我得出以下假设:
@StreamListener都会触发 Spring Retry这些假设正确吗?
现在,在我的应用程序中,我有一个模式,其中一些消息可以立即处理,但其他消息必须推迟到以后再次尝试(使用指数退避等)。
我应该抛出异常导致 Spring Cloud Stream 在绑定层重试这些消息,还是自己实现重试并跟踪我自己的重试上下文?
如果我应该依赖 Cloud Stream 的重试设置,我应该如何自定义退避策略等?
spring-retry spring-cloud-stream spring-cloud-stream-binder-kafka
spring-boot ×5
apache-kafka ×2
spring ×2
spring-cloud ×2
hystrix ×1
java ×1
json ×1
junit ×1
kubernetes ×1
spring-cloud-stream-binder-kafka ×1
spring-kafka ×1
spring-retry ×1
turbine ×1
validation ×1