标签: spring-cloud-stream

Spring Cloud Stream消息来自/到JSON转换配置

我正在使用带有RabbitMQ活页夹的Spring Cloud Stream.它适用于byte[]有效负载和Java本机序列化,但我需要使用JSON有效负载.

这是我的处理器类.

@EnableBinding(Processor.class)
public class MessageProcessor {
    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public OutputDto handleIncomingMessage(InputDto inputDto) {
        // Run some job.
        return new OutputDto();
    }
}
Run Code Online (Sandbox Code Playgroud)

InputDto并且OutputDto是杰克逊注释的POJO.

  • 如何配置JSON转换策略?
  • 邮件标题应该如何被接受和处理?

json spring-rabbit spring-cloud spring-cloud-stream

5
推荐指数
1
解决办法
6416
查看次数

spring-cloud-stream-request-reply消息传递模式

是否有一个应该与spring-cloud-stream一起使用的请求-答复模式?我在spring-cloud-stream上可以找到的所有文档都针对MessageChannel.send即发即弃类型的生产者,我对spring-integration的@MessagingGateway很熟悉,但是我不确定这会如何与spring-cloud-stream合作。当您具有一个REST POST端点,该端点使用分配的标识符保存实体并且您需要将分配的标识符返回给调用者,但仍要使用消息传递时,这将很有用。

spring-cloud-stream

5
推荐指数
1
解决办法
1651
查看次数

spring cloud stream'bindingService'错误

我正在尝试实施Turbine AMQP来整合从多个服务到Hystrix Dashboard的所有流.

所以我在gradle文件中添加了几个依赖项,之后由于某种原因我无法启动我的应用程序.

来自启动的LOGS,我看到异常.

[LogMessage=Application startup failed]
org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'bindingService' defined in class path resource [org/springframework/cloud/stream/config/ChannelBindingServiceConfiguration.class]: Unsatisfied dependency expressed through constructor argument with index 1 of type [org.springframework.cloud.stream.binder.BinderFactory]: Error creating bean with name 'binderFactory' defined in class path resource [org/springframework/cloud/stream/config/BinderFactoryConfiguration.class]: Unsatisfied dependency expressed through constructor argument with index 0 of type [org.springframework.cloud.stream.binder.BinderTypeRegistry]: Error creating bean with name 'binderTypeRegistry' defined in class path resource [org/springframework/cloud/stream/config/BinderFactoryConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed …
Run Code Online (Sandbox Code Playgroud)

turbine spring-boot hystrix spring-cloud spring-cloud-stream

5
推荐指数
2
解决办法
1万
查看次数

Spring Cloud Dataflow有哪些好处?

基于我所看到的,在Spring Cloud Dataflow(SCDF)中创建流将部署底层应用程序,绑定通信服务(如RabbitMQ),设置Spring Cloud Stream环境变量,并启动应用程序.这可以使用cf push命令轻松完成.

与此同时,我一直遇到Spring Cloud Dataflow的一些缺点:

  • SCDF Server是PCF上的内存耗尽(我有一个只有6个应用程序的流,但我需要大约10GB的服务器)
  • 应用程序命名,内存,实例等没有灵活性(通常在manifest.yml中设置的所有内容)
  • 与构建工具(如Bamboo)集成将需要额外的工作,因为我们必须使用SCDF CLI而不仅仅是PCF CLI
  • 无法修改现有流.要执行蓝绿部署,您必须手动部署应用程序(绑定服务并手动设置环境变量).然后,一旦蓝绿部署完成,SCDF将流显示为Failed,因为它不知道其中一个底层应用程序已更改.
  • 我遇到的各种错误,比如尝试重新部署失败的流时的MySQL主键约束错误

那我错过了什么?为什么使用Spring Cloud Dataflow只对手动部署应用程序有益?

cloud-foundry spring-cloud-stream spring-cloud-dataflow

5
推荐指数
1
解决办法
2199
查看次数

Spring Cloud Stream动态频道

我正在使用Spring Cloud Stream,并想以编程方式创建和绑定频道。我的用例是,在应用程序启动期间,我会收到要订阅的Kafka主题的动态列表。然后如何为每个主题创建频道?

apache-kafka spring-boot spring-cloud-stream

5
推荐指数
1
解决办法
3336
查看次数

如何使用嵌入在spring云流中的kafka创建单元测试

很抱歉这个问题过于通用,但有人有一些关于如何使用kafka嵌入式执行生产者和消费者测试的教程或指南.我已经尝试了几个,但有几个版本的依赖项,没有实际工作= /

我正在使用spring cloud stream kafka.

junit apache-kafka spring-cloud-stream spring-kafka

5
推荐指数
1
解决办法
8502
查看次数

Spring Cloud Stream RabbitMQ

我试图理解为什么我想在 RabbitMQ 中使用 Spring 云流。我看过 RabbitMQ Spring 教程 4 ( https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html ),这基本上就是我想要做的。它创建了一个带有 2 个附加队列的直接交换,并根据路由键将消息路由到 Q1 或 Q2。

如果您查看教程,则整个过程非常简单,您创建所有部件,将它们绑定在一起,然后就可以开始了。

我想知道使用 Sing Cloud Stream 有什么好处,如果这就是它的用例。创建一个简单的交换很容易,甚至可以通过流直接定义目的地和组。所以我想为什么不更进一步并尝试使用流处理教程案例。

我已经看到 Stream 有一个BinderAwareChannelResolver似乎做同样的事情。但是我正在努力将它们放在一起以实现与 RabbitMQ Spring 教程中相同的效果。我不确定这是否是一个依赖问题,但我似乎在这里从根本上误解了一些东西,我认为是这样的:

spring.cloud.stream.bindings.output.destination=myDestination
spring.cloud.stream.bindings.output.group=consumerGroup
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key'
Run Code Online (Sandbox Code Playgroud)

应该诀窍。

有没有人有一个源和接收器的最小示例,它基本上创建了一个直接交换,将 2 个队列绑定到它,并根据路由关键路由到这两个队列之一,如https://www.rabbitmq.com/tutorials /tutorial-four-spring-amqp.html

编辑

下面是一组最小的代码,它演示了如何执行我所要求的操作。我没有附上,build.gradle因为它很简单(但如果有人感兴趣,请告诉我)

application.properties: 设置生产者

spring.cloud.stream.bindings.output.destination=tut.direct
spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type
Run Code Online (Sandbox Code Playgroud)

Sources.class: 设置制作人频道

public interface Sources {

    String OUTPUT = "output";

    @Output(Sources.OUTPUT)
    MessageChannel output();
}
Run Code Online (Sandbox Code Playgroud)

StatusController.class:响应休息呼叫并使用特定的路由键发送消息

/**
 * Status endpoint for the health-check service.
 */
@RestController
@EnableBinding(Sources.class) …
Run Code Online (Sandbox Code Playgroud)

spring spring-boot spring-cloud-stream

5
推荐指数
1
解决办法
2600
查看次数

Spring Cloud Stream验证

如何使用标准的基于Spring注释的验证在消息侦听器中使用Spring Cloud Stream框架执行验证?

尝试了不同的情况,@Valid @Payload对于传入的对象,尝试使用@Validatedon entity 处理器的方法验证,但它没有帮助.

@StreamListener(MediaItemStream.ITEM_LIKED_CHANNEL)
public void handleLikeMessage(@Valid @Payload LikeInputDto like) {...
Run Code Online (Sandbox Code Playgroud)

@Bean
public MethodValidationPostProcessor methodValidationPostProcessor() {
    return new MethodValidationPostProcessor();
}
Run Code Online (Sandbox Code Playgroud)

目前最好的方法是使用自定义服务进行验证,但它看起来并不像想要的那样.

@Log4j2
@Service
@AllArgsConstructor
public class LikeStreamHandler {

    private MediaEventMessagingService mediaEventMessagingService;
    private ValidationService validationService;

    @StreamListener(MediaItemStream.ITEM_LIKED_CHANNEL)
    public void handleLikeMessage(LikeInputDto like) {
        validationService.validate(like);

        log.debug("Handling LIKE message: {}", like);
        mediaEventMessagingService.processLikeEvent(like);
    }
}
Run Code Online (Sandbox Code Playgroud)

java validation spring spring-boot spring-cloud-stream

5
推荐指数
1
解决办法
523
查看次数

Spring Boot 2正常关闭Web

是否有任何建议的方法可以正常关闭Kubernetes中的Spring:boot 2应用程序。

  1. 捕捉终止信号SIGTERM
  2. 告诉Tomcat停止接受新请求。(或Jetty,Undertow或Netty / WebFlux,具体取决于所使用的嵌入式Web服务器)。或告诉SCS停止在Kafka上发送/收听消息。
  3. 告诉执行器运行状况端点进行SERVICE_UNAVAILABLE(503)
  4. 然后X秒钟后关闭应用程序或(SIGKILL)

我正在尝试正常关闭Rest应用程序和SCS(kafka消费者与生产者)应用程序

spring-boot kubernetes spring-boot-actuator spring-cloud-stream

5
推荐指数
1
解决办法
2227
查看次数

理解 Spring Cloud Stream Kafka 和 Spring Retry

我有一个使用 Kafka binder 的 Spring Cloud Stream 项目,我正在尝试理解并最终自定义 Cloud Stream 使用的 RetryTemplate。

我没有找到很多关于它是如何工作的文档,但是我所阅读的内容使我得出以下假设:

  • Cloud Stream 默认配置并启用 Spring Retry,包括默认的重试和退避策略。
  • 默认情况下, a 中任何未捕获的异常@StreamListener都会触发 Spring Retry
  • Cloud Stream 会以某种方式跟踪每条消息的 RetryContext 信息(如何?我不确定)

这些假设正确吗?

现在,在我的应用程序中,我有一个模式,其中一些消息可以立即处理,但其他消息必须推迟到以后再次尝试(使用指数退避等)。

我应该抛出异常导致 Spring Cloud Stream 在绑定层重试这些消息,还是自己实现重试并跟踪我自己的重试上下文?

如果我应该依赖 Cloud Stream 的重试设置,我应该如何自定义退避策略等?

spring-retry spring-cloud-stream spring-cloud-stream-binder-kafka

5
推荐指数
1
解决办法
2679
查看次数