标签: spring-cloud-stream

spring cloud stream'bindingService'错误

我正在尝试实施Turbine AMQP来整合从多个服务到Hystrix Dashboard的所有流.

所以我在gradle文件中添加了几个依赖项,之后由于某种原因我无法启动我的应用程序.

来自启动的LOGS,我看到异常.

[LogMessage=Application startup failed]
org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'bindingService' defined in class path resource [org/springframework/cloud/stream/config/ChannelBindingServiceConfiguration.class]: Unsatisfied dependency expressed through constructor argument with index 1 of type [org.springframework.cloud.stream.binder.BinderFactory]: Error creating bean with name 'binderFactory' defined in class path resource [org/springframework/cloud/stream/config/BinderFactoryConfiguration.class]: Unsatisfied dependency expressed through constructor argument with index 0 of type [org.springframework.cloud.stream.binder.BinderTypeRegistry]: Error creating bean with name 'binderTypeRegistry' defined in class path resource [org/springframework/cloud/stream/config/BinderFactoryConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed …
Run Code Online (Sandbox Code Playgroud)

turbine spring-boot hystrix spring-cloud spring-cloud-stream

5
推荐指数
2
解决办法
1万
查看次数

Spring Cloud Dataflow有哪些好处?

基于我所看到的,在Spring Cloud Dataflow(SCDF)中创建流将部署底层应用程序,绑定通信服务(如RabbitMQ),设置Spring Cloud Stream环境变量,并启动应用程序.这可以使用cf push命令轻松完成.

与此同时,我一直遇到Spring Cloud Dataflow的一些缺点:

  • SCDF Server是PCF上的内存耗尽(我有一个只有6个应用程序的流,但我需要大约10GB的服务器)
  • 应用程序命名,内存,实例等没有灵活性(通常在manifest.yml中设置的所有内容)
  • 与构建工具(如Bamboo)集成将需要额外的工作,因为我们必须使用SCDF CLI而不仅仅是PCF CLI
  • 无法修改现有流.要执行蓝绿部署,您必须手动部署应用程序(绑定服务并手动设置环境变量).然后,一旦蓝绿部署完成,SCDF将流显示为Failed,因为它不知道其中一个底层应用程序已更改.
  • 我遇到的各种错误,比如尝试重新部署失败的流时的MySQL主键约束错误

那我错过了什么?为什么使用Spring Cloud Dataflow只对手动部署应用程序有益?

cloud-foundry spring-cloud-stream spring-cloud-dataflow

5
推荐指数
1
解决办法
2199
查看次数

Spring Cloud Stream 和 RabbitMQ 健康检查

我有一个使用 Spring Cloud Stream Rabbit 和 Eureka Discovery Client 的简单 Spring Boot 应用程序。该应用程序与 Eureka Server 一起工作得很好,并且通过 RabbitMQ 的消息传递也工作正常。但是如果我启动一个 Spring Boot Admin Server,应用程序就会开始记录健康检查失败:

2017-06-21 19:47:57.352  INFO 11416 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver      : Resolving eureka endpoints via configuration
2017-06-21 19:51:25.047  INFO 11416 --- [nio-8303-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring FrameworkServlet 'dispatcherServlet'
2017-06-21 19:51:25.047  INFO 11416 --- [nio-8303-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started
2017-06-21 19:51:25.069  INFO 11416 --- [nio-8303-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization completed in 22 ms
2017-06-21 19:51:27.288  WARN 11416 --- [nio-8303-exec-1] …
Run Code Online (Sandbox Code Playgroud)

spring spring-boot spring-cloud-stream spring-boot-admin spring-integration-amqp

5
推荐指数
1
解决办法
6888
查看次数

如何从 MessageCollector 接收带有对象负载而不是字符串的 GenericMessage

我尝试通过spring-cloud-stream使用测试消息发送和接收MessageCollector

当我对流云使用本机 java 序列化时一切正常,但是当我将序列化更改为 json 时,MessageCollector 返回带有string有效负载而不是SomeObject有效负载的GenericMessage 。

配置是

  cloud.stream.default.contentType=application/json 
Run Code Online (Sandbox Code Playgroud)

测试用例:

    Message outMsg = new GenericMessage<SomeObject>(new SomeObject(1));
    someChannel.send(outMsg);
    GenericMessage<SomeObject> inMsg = (GenericMessage<SomeObject>) messageCollector.forChannel(someChannel).poll();

    Assert.assertTrue(inMsg.getPayload() instanceof SomeObject);
Run Code Online (Sandbox Code Playgroud)

因此,断言是错误的。inMsg包含字符串负载(字符串包含 SomeObject 的有效 json 表示)。

我的问题是:我如何GenericMessage从 SomeObject接收有效载荷MessageCollector

生产环境工作正常,无需显式映射到SomeObject.

java spring spring-messaging spring-cloud spring-cloud-stream

5
推荐指数
1
解决办法
2385
查看次数

springcloud-stream 和 springcloud-bus 有什么区别?

它们之间有什么区别?

springcloud-stream的实例之一springcloud-bus吗?
据我所知,他们都关心 MQ。

spring-boot spring-cloud-stream spring-cloud-bus

5
推荐指数
1
解决办法
576
查看次数

如何在默认情况下从 Kafka Spring Cloud Stream 消费并同时消费由 Confluent API 生成的 Kafka 消息?

我正在构建一个微服务组件,它将默认使用由其他 (SCS) 组件生成的 Spring Cloud Stream (SCS) Kafka 消息。

但是我还需要从使用融合 API 的其他组件中使用 Kafka 消息。

我有一个示例存储库,显示了我正在尝试做什么。

https://github.com/donalthurley/KafkaConsumeScsAndConfluent

这是下面带有 SCS 输入绑定和融合输入绑定的应用程序配置。

spring:
  application:
    name: kafka
  kafka:
    consumer:
      properties.schema.registry.url: http://192.168.99.100:8081
  cloud:
    stream:
      kafka:
        binder:
          brokers: PLAINTEXT://192.168.99.100:9092
#          configuration:
#            specific:
#              avro:
#                reader: true
#            key:
#              deserializer: org.apache.kafka.common.serialization.StringDeserializer
#            value:
#              deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

      bindings:
        inputConfluent:
          contentType: application/*+avro
          destination: confluent-destination
          group: input-confluent-group
        inputScs:
          contentType: application/*+avro
          destination: scs-destination
          group: input-scs-group
Run Code Online (Sandbox Code Playgroud)

通过上述配置,我使用 SCS 默认配置创建了两个使用者。例如,类 org.apache.kafka.common.serialization.ByteArrayDeserializer 是两个输入绑定的值反序列化器。

如果我删除上述配置中的注释,我会从我的 Confluent 客户端发送配置的两个消费者。例如,类 io.confluent.kafka.serializers.KafkaAvroDeserializer 是两个输入绑定的值反序列化器。

我理解因为配置在 Kafka binder …

java spring apache-kafka spring-cloud-stream

5
推荐指数
1
解决办法
1386
查看次数

spring-cloud-dataflow:流部署在 OSX 上失败

我无法http | log按照入门流指南部署和执行基本流 :

wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/2.1.0.RELEASE/spring-cloud-dataflow-server/docker-compose.yml;
export DATAFLOW_VERSION=2.1.0.RELEASE;
export SKIPPER_VERSION=2.0.2.RELEASE;
docker-compose up;
open http://localhost:9393/dashboard/#/streams/create;
echo "Then, create a stream via text input: `http | log`"
echo "Then, deploy the stream. The deployment fails with exit code 137."
Run Code Online (Sandbox Code Playgroud)

我得到绿色的“成功部署的流定义”。状态显示为“正在部署”,但从未完全部署。ERROR日志、UI 或网络请求中没有消息。

这是docker-compose up我部署流后的控制台输出。

skipper              | 2019-05-24 22:31:53.363  INFO 1 --- [io-7577-exec-10] o.s.s.support.LifecycleObjectSupport     : started UPGRADE UPGRADE_DEPLOY_TARGET_APPS_SUCCEED UPGRADE_DEPLOY_TARGET_APPS UPGRADE_START UPGRADE_DELETE_SOURCE_APPS UPGRADE_CHECK_TARGET_APPS UPGRADE_WAIT_TARGET_APPS UPGRADE_CANCEL UPGRADE_DEPLOY_TARGET_APPS_FAILED UPGRADE_CHECK_CHOICE UPGRADE_EXIT INSTALL INSTALL_INSTALL INSTALL_EXIT ERROR DELETE DELETE_DELETE DELETE_EXIT ROLLBACK ROLLBACK_START …
Run Code Online (Sandbox Code Playgroud)

spring spring-cloud-stream spring-cloud-dataflow

5
推荐指数
1
解决办法
416
查看次数

发件箱模式 - 我们如何防止消息中继过程生成重复的消息?

实现发件箱模式的通常方法是将消息有效负载存储在发件箱表中,并有一个单独的进程(消息中继)查询待处理的消息,并将它们发布到消息代理中,在我的例子中是 Kafka。

发件箱表的状态可能如下所示。

 OUTBOX TABLE
 ---------------------------------
|ID | STATE     | TOPIC | PAYLOAD |
 ---------------------------------
| 1 | PROCESSED | user            |
| 2 | PENDING   | user            |
| 3 | PENDING   | billing         |
----------------------------------
Run Code Online (Sandbox Code Playgroud)

My Message Relay 是一个 Spring Boot/Cloud Stream 应用程序,它定期 ( @Scheduled) 查找 PENDING 记录,将它们发布到 Kafka 并将记录更新为 PROCESSED 状态。

第一个问题是:如果我启动 Message Relay 的多个实例,所有实例都会查询 Outbox 表,并且可能在某些时候不同的实例将获得相同的 PENDING 注册表以发布到 Kafka,从而生成重复的消息。我怎样才能防止这种情况?

另一种情况:假设只有一个消息中继。它获取一个 PENDING 记录,将其发布到主题,但在将记录更新为 PROCESSED 之前崩溃。当它再次启动时,它会找到相同的 PENDING 记录并再次发布它。有没有办法避免这种重复,或者唯一的方法是设计一个幂等系统。

apache-kafka spring-cloud spring-cloud-stream

5
推荐指数
1
解决办法
1340
查看次数

使用 objectMapper 将 JSON 日期格式反序列化为 ZonedDateTime

背景

  1. 我有以下 JSON(来自 Kafka 的消息)
{
      "markdownPercentage": 20,
      "currency": "SEK",
      "startDate": "2019-07-25"
}
Run Code Online (Sandbox Code Playgroud)
  1. 我有以下(生成 JSON 模式)POJO(我无法更改 POJO,因为它是公司的共享资源)
{
      "markdownPercentage": 20,
      "currency": "SEK",
      "startDate": "2019-07-25"
}
Run Code Online (Sandbox Code Playgroud)
  1. 我们的应用程序是一个 Spring Boot 应用程序,它使用 Spring Cloud Stream 从 Kafka 读取 JSON 消息 (1) 并使用 POJO (2),然后对其进行处理。

问题

当应用程序尝试将消息反序列化到对象时,它会引发以下异常

public class Markdown {
    @JsonProperty("markdownPercentage")
    @NotNull
    private Integer markdownPercentage = 0;
    @JsonProperty("currency")
    @NotNull
    private String currency = "";
    @JsonFormat(
        shape = Shape.STRING,
        pattern = "yyyy-MM-dd"
    )
    @JsonProperty("startDate")
    @NotNull
    private ZonedDateTime startDate;

    // Constructors, Getters, Setters etc.

} …
Run Code Online (Sandbox Code Playgroud)

java json jackson spring-boot spring-cloud-stream

5
推荐指数
1
解决办法
1万
查看次数

如何在 Spring Cloud Stream 项目中将传入的标头映射为 String 而不是 byte[]?

我有一个使用 Spring Integration DSL 流和 Kafka 绑定器的简单 Spring Cloud Stream 项目。一切正常,但来自 Kafka 的消息头值以byte[].

这意味着我的 SI@Header参数需要是 类型byte[]。哪个有效,但最好将它们作为字符串(我关心的所有入站标头都是字符串值)。

我已经将 Kafka 客户端配置为使用 StringSerializer/StringDeserializer。我假设我还需要以某种方式告诉 Spring Kafka 哪些标头映射为字符串以及使用什么字符编码。

我显然在这里遗漏了一些东西。有小费吗?

spring spring-integration spring-cloud-stream spring-kafka spring-integration-dsl

5
推荐指数
1
解决办法
1851
查看次数