我正在尝试实施Turbine AMQP来整合从多个服务到Hystrix Dashboard的所有流.
所以我在gradle文件中添加了几个依赖项,之后由于某种原因我无法启动我的应用程序.
来自启动的LOGS,我看到异常.
[LogMessage=Application startup failed]
org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'bindingService' defined in class path resource [org/springframework/cloud/stream/config/ChannelBindingServiceConfiguration.class]: Unsatisfied dependency expressed through constructor argument with index 1 of type [org.springframework.cloud.stream.binder.BinderFactory]: Error creating bean with name 'binderFactory' defined in class path resource [org/springframework/cloud/stream/config/BinderFactoryConfiguration.class]: Unsatisfied dependency expressed through constructor argument with index 0 of type [org.springframework.cloud.stream.binder.BinderTypeRegistry]: Error creating bean with name 'binderTypeRegistry' defined in class path resource [org/springframework/cloud/stream/config/BinderFactoryConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed …Run Code Online (Sandbox Code Playgroud) turbine spring-boot hystrix spring-cloud spring-cloud-stream
基于我所看到的,在Spring Cloud Dataflow(SCDF)中创建流将部署底层应用程序,绑定通信服务(如RabbitMQ),设置Spring Cloud Stream环境变量,并启动应用程序.这可以使用cf push命令轻松完成.
与此同时,我一直遇到Spring Cloud Dataflow的一些缺点:
那我错过了什么?为什么使用Spring Cloud Dataflow只对手动部署应用程序有益?
我有一个使用 Spring Cloud Stream Rabbit 和 Eureka Discovery Client 的简单 Spring Boot 应用程序。该应用程序与 Eureka Server 一起工作得很好,并且通过 RabbitMQ 的消息传递也工作正常。但是如果我启动一个 Spring Boot Admin Server,应用程序就会开始记录健康检查失败:
2017-06-21 19:47:57.352 INFO 11416 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
2017-06-21 19:51:25.047 INFO 11416 --- [nio-8303-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring FrameworkServlet 'dispatcherServlet'
2017-06-21 19:51:25.047 INFO 11416 --- [nio-8303-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization started
2017-06-21 19:51:25.069 INFO 11416 --- [nio-8303-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization completed in 22 ms
2017-06-21 19:51:27.288 WARN 11416 --- [nio-8303-exec-1] …Run Code Online (Sandbox Code Playgroud) spring spring-boot spring-cloud-stream spring-boot-admin spring-integration-amqp
我尝试通过spring-cloud-stream使用测试消息发送和接收MessageCollector。
当我对流云使用本机 java 序列化时一切正常,但是当我将序列化更改为 json 时,MessageCollector 返回带有string有效负载而不是SomeObject有效负载的GenericMessage 。
配置是
cloud.stream.default.contentType=application/json
Run Code Online (Sandbox Code Playgroud)
测试用例:
Message outMsg = new GenericMessage<SomeObject>(new SomeObject(1));
someChannel.send(outMsg);
GenericMessage<SomeObject> inMsg = (GenericMessage<SomeObject>) messageCollector.forChannel(someChannel).poll();
Assert.assertTrue(inMsg.getPayload() instanceof SomeObject);
Run Code Online (Sandbox Code Playgroud)
因此,断言是错误的。inMsg包含字符串负载(字符串包含 SomeObject 的有效 json 表示)。
我的问题是:我如何GenericMessage从 SomeObject接收有效载荷MessageCollector?
生产环境工作正常,无需显式映射到SomeObject.
java spring spring-messaging spring-cloud spring-cloud-stream
它们之间有什么区别?
是springcloud-stream的实例之一springcloud-bus吗?
据我所知,他们都关心 MQ。
我正在构建一个微服务组件,它将默认使用由其他 (SCS) 组件生成的 Spring Cloud Stream (SCS) Kafka 消息。
但是我还需要从使用融合 API 的其他组件中使用 Kafka 消息。
我有一个示例存储库,显示了我正在尝试做什么。
https://github.com/donalthurley/KafkaConsumeScsAndConfluent
这是下面带有 SCS 输入绑定和融合输入绑定的应用程序配置。
spring:
application:
name: kafka
kafka:
consumer:
properties.schema.registry.url: http://192.168.99.100:8081
cloud:
stream:
kafka:
binder:
brokers: PLAINTEXT://192.168.99.100:9092
# configuration:
# specific:
# avro:
# reader: true
# key:
# deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value:
# deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
bindings:
inputConfluent:
contentType: application/*+avro
destination: confluent-destination
group: input-confluent-group
inputScs:
contentType: application/*+avro
destination: scs-destination
group: input-scs-group
Run Code Online (Sandbox Code Playgroud)
通过上述配置,我使用 SCS 默认配置创建了两个使用者。例如,类 org.apache.kafka.common.serialization.ByteArrayDeserializer 是两个输入绑定的值反序列化器。
如果我删除上述配置中的注释,我会从我的 Confluent 客户端发送配置的两个消费者。例如,类 io.confluent.kafka.serializers.KafkaAvroDeserializer 是两个输入绑定的值反序列化器。
我理解因为配置在 Kafka binder …
我无法http | log按照入门流指南部署和执行基本流
:
wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/2.1.0.RELEASE/spring-cloud-dataflow-server/docker-compose.yml;
export DATAFLOW_VERSION=2.1.0.RELEASE;
export SKIPPER_VERSION=2.0.2.RELEASE;
docker-compose up;
open http://localhost:9393/dashboard/#/streams/create;
echo "Then, create a stream via text input: `http | log`"
echo "Then, deploy the stream. The deployment fails with exit code 137."
Run Code Online (Sandbox Code Playgroud)
我得到绿色的“成功部署的流定义”。状态显示为“正在部署”,但从未完全部署。ERROR日志、UI 或网络请求中没有消息。
这是docker-compose up我部署流后的控制台输出。
skipper | 2019-05-24 22:31:53.363 INFO 1 --- [io-7577-exec-10] o.s.s.support.LifecycleObjectSupport : started UPGRADE UPGRADE_DEPLOY_TARGET_APPS_SUCCEED UPGRADE_DEPLOY_TARGET_APPS UPGRADE_START UPGRADE_DELETE_SOURCE_APPS UPGRADE_CHECK_TARGET_APPS UPGRADE_WAIT_TARGET_APPS UPGRADE_CANCEL UPGRADE_DEPLOY_TARGET_APPS_FAILED UPGRADE_CHECK_CHOICE UPGRADE_EXIT INSTALL INSTALL_INSTALL INSTALL_EXIT ERROR DELETE DELETE_DELETE DELETE_EXIT ROLLBACK ROLLBACK_START …Run Code Online (Sandbox Code Playgroud) 实现发件箱模式的通常方法是将消息有效负载存储在发件箱表中,并有一个单独的进程(消息中继)查询待处理的消息,并将它们发布到消息代理中,在我的例子中是 Kafka。
发件箱表的状态可能如下所示。
OUTBOX TABLE
---------------------------------
|ID | STATE | TOPIC | PAYLOAD |
---------------------------------
| 1 | PROCESSED | user |
| 2 | PENDING | user |
| 3 | PENDING | billing |
----------------------------------
Run Code Online (Sandbox Code Playgroud)
My Message Relay 是一个 Spring Boot/Cloud Stream 应用程序,它定期 ( @Scheduled) 查找 PENDING 记录,将它们发布到 Kafka 并将记录更新为 PROCESSED 状态。
第一个问题是:如果我启动 Message Relay 的多个实例,所有实例都会查询 Outbox 表,并且可能在某些时候不同的实例将获得相同的 PENDING 注册表以发布到 Kafka,从而生成重复的消息。我怎样才能防止这种情况?
另一种情况:假设只有一个消息中继。它获取一个 PENDING 记录,将其发布到主题,但在将记录更新为 PROCESSED 之前崩溃。当它再次启动时,它会找到相同的 PENDING 记录并再次发布它。有没有办法避免这种重复,或者唯一的方法是设计一个幂等系统。
背景
{
"markdownPercentage": 20,
"currency": "SEK",
"startDate": "2019-07-25"
}
Run Code Online (Sandbox Code Playgroud)
{
"markdownPercentage": 20,
"currency": "SEK",
"startDate": "2019-07-25"
}
Run Code Online (Sandbox Code Playgroud)
问题
当应用程序尝试将消息反序列化到对象时,它会引发以下异常
public class Markdown {
@JsonProperty("markdownPercentage")
@NotNull
private Integer markdownPercentage = 0;
@JsonProperty("currency")
@NotNull
private String currency = "";
@JsonFormat(
shape = Shape.STRING,
pattern = "yyyy-MM-dd"
)
@JsonProperty("startDate")
@NotNull
private ZonedDateTime startDate;
// Constructors, Getters, Setters etc.
} …Run Code Online (Sandbox Code Playgroud) 我有一个使用 Spring Integration DSL 流和 Kafka 绑定器的简单 Spring Cloud Stream 项目。一切正常,但来自 Kafka 的消息头值以byte[].
这意味着我的 SI@Header参数需要是 类型byte[]。哪个有效,但最好将它们作为字符串(我关心的所有入站标头都是字符串值)。
我已经将 Kafka 客户端配置为使用 StringSerializer/StringDeserializer。我假设我还需要以某种方式告诉 Spring Kafka 哪些标头映射为字符串以及使用什么字符编码。
我显然在这里遗漏了一些东西。有小费吗?
spring spring-integration spring-cloud-stream spring-kafka spring-integration-dsl
spring ×5
spring-boot ×4
java ×3
spring-cloud ×3
apache-kafka ×2
hystrix ×1
jackson ×1
json ×1
spring-kafka ×1
turbine ×1