我正在尝试使用 RabbitMQ 配置一个简单的 Spring Cloud Stream 应用程序。我使用的代码主要来自spring-cloud-stream-samples。我有一个切入点:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
以及示例中的简单消息生产者:
@EnableBinding(Source.class)
public class SourceModuleDefinition {
private String format = "yyyy-MM-dd HH:mm:ss";
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource() {
return () -> new GenericMessage<>(new SimpleDateFormat(this.format).format(new Date()));
}
}
Run Code Online (Sandbox Code Playgroud)
此外,这里是 application.yml 配置:
fixedDelay: 5000
spring:
cloud:
stream:
bindings:
output:
destination: test
Run Code Online (Sandbox Code Playgroud)
当我运行该示例时,它连接到 Rabbit 并创建一个名为 test 的交换。但我的问题是,它不会自动创建队列和绑定。我可以看到 Rabbit 中的流量,但是我所有的消息都消失了。虽然我需要它们留在某个队列中,除非消费者读取它们。 …
我正在尝试使用 spring-cloud-streamrabbitmq 编写一个应用程序,但不使用 spring-boot。当我这样做时,gradle dependencies我发现云流依赖于 Spring Boot 模块。是否可以在没有 spring-boot 的情况下使用 spring-cloud-stream ?有人可以给我指出任何例子吗?
compile - Dependencies for source set 'main'.
+--- org.springframework.cloud:spring-cloud-stream: -> 1.0.2.RELEASE
| +--- org.springframework.boot:spring-boot-starter-actuator:1.3.5.RELEASE -> 1.2.1.RELEASE
| | +--- org.springframework.boot:spring-boot-actuator:1.2.1.RELEASE
| | | +--- org.springframework.boot:spring-boot:1.2.1.RELEASE
| | | | +--- org.springframework:spring-core:4.1.4.RELEASE
| | | | \--- org.springframework:spring-context:4.1.4.RELEASE
| | | | +--- org.springframework:spring-aop:4.1.4.RELEASE
| | | | | +--- aopalliance:aopalliance:1.0
| | | | | +--- org.springframework:spring-beans:4.1.4.RELEASE
| | | | | | \--- org.springframework:spring-core:4.1.4.RELEASE
| …Run Code Online (Sandbox Code Playgroud) 我有两个微服务 Student 和 Teacher
在学生微服务中,我创建了 MessageSink 用于交换 XYZ
@Input("XYZ")
SubscribableChannel xyz();
Run Code Online (Sandbox Code Playgroud)
在教师微服务中,我将交换 XYZ 配置为扇出
应用程序属性
spring.cloud.stream.rabbit.bindings.XYZ.producer.exchangeType=fanout
spring.cloud.stream.bindings.XYZ.contentType=application/json
Run Code Online (Sandbox Code Playgroud)
但我在这里面临的问题是学生服务在教师服务之前启动,并且它正在创建类型为 Topic 的 XYZ 交换。
为了解决这个问题,我在两个服务(即消费者和生产者)中都添加了 exchangeType。随着服务数量的增加,这些配置的数量也在增加。
我想将默认的 exchangeType 更改为扇出,因此以下是几个问题。
exchangeType为扇出而不是主题spring-cloud-stream?exchangeType通过rabbit-mq配置更改默认值?exchangeTypeastopic和 routing key #。但是这种方法的问题是stomp客户端为每个浏览器主机创建队列,队列名称为stomp-subscription-randomString空路由键。那么有没有办法在订阅流的同时提供路由密钥?我/exchange/exchangeName用作交换URL我正在寻找的拓扑是
到目前为止,我还没有看到在 Cloud Stream 中定义每条消息的主题的方法。我知道消费者将绑定到特定主题,但是生产者在将消息发送到交换器之前如何设置每条消息的主题?
source.output().send(MessageBuilder.withPayload(myMessage).build());
Run Code Online (Sandbox Code Playgroud)
不提供任何方法来设置交换的主题以路由到正确的消费者。
或者也许我不理解正确的东西?
更新
我希望不会在消费者中收到消息,因为bindingRoutingKey我2222正在发送routeTo 1111. 但我仍然在消费者身上收到它。
生产者属性:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.output.destination=messageExchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers['routeTo']
@EnableBinding(Source.class)
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
发件人:
source.output().send(MessageBuilder.withPayload(mo).setHeader("routeTo", "1111").build());
以及消费者:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.input.destination=messageExchange
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=2222
Run Code Online (Sandbox Code Playgroud)
应用:
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void ReceiveMo(String moDTO) …Run Code Online (Sandbox Code Playgroud) 我正在使用主题上的日志压缩构建 Kafka 应用程序,但我无法发送墓碑值 (KafkaNull)
我曾尝试使用序列化程序的默认配置,当它不起作用时,我使用了“使用原始标头发布 null/tombstone 消息”中的建议更改将 application.properties 设置为:
spring.cloud.stream.output.producer.useNativeEncoding=true
spring.cloud.stream.kafka.binder.configuration.value.serializer=org.springframework.kafka.support.serializer.JsonSerializer
Run Code Online (Sandbox Code Playgroud)
我必须向流发送消息的代码是
this.stockTopics.compactedStocks().send(MessageBuilder
.withPayload(KafkaNull.INSTANCE)
.setHeader(KafkaHeaders.MESSAGE_KEY,company.getBytes())
.build())
Run Code Online (Sandbox Code Playgroud)
this.stopTopics.compactedStocks() 返回一个我可以向其发送消息的 messageStream。
每次我尝试使用 KafkaNull 实例作为有效负载发送该消息时,我都会收到错误消息 Failed to convert message: 'GenericMessage [payload=org.springframework.kafka.support.KafkaNull@1c2d8163, headers={id=f81857e7-fbd0-56f5-8418-6a1944e7f2b1, kafka_messageKey=[B@36ec022a, contentType=application/json, timestamp=1547827957485}]' to outbound message.
我希望消息简单地以空值发送给消费者,但显然它是错误的。
我想知道使用 spring.cloud.stream.default.consumer.concurrency 属性时“并发”的确切含义。
文档(https://docs.spring.io/spring-cloud-stream/docs/Chelsea.RELEASE/reference/html/_configuration_options.html)说“入站消费者的并发性”,这可以有多种解释。
幕后创建了什么样的线程执行器?
谢谢!
假设,我能够通过 Kafka 或 RabbitMQ 发送消息并使用 @StreamListener 使用它们。
绑定器配置为 content-type = 'application/json',所以我想可以在有效负载上添加条件。
我的要求是仅当字段的日期在 now() 之前时才获取元素。当此条件为真时,将消耗所有其他元素。
例如,我希望像这样:
@StreamListener(value = INPUT, condition = "data.startDate > now()")
public void onMessage(@Payload Data data) {
// ...
}
Run Code Online (Sandbox Code Playgroud)
第一个问题是我收到错误:EL1008E:在“byte[]”类型的对象上找不到属性或字段“startDate”——可能不是公共的或无效的?
我正在查看一个 Spring Boot 服务,它从 apache kafka 读取消息,通过 http 从另一个服务请求消息指示的记录,处理它们,将一些数据保存到数据库中,并将结果发布到另一个主题。
这是通过
@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)
Run Code Online (Sandbox Code Playgroud)
这是在几个服务中完成的,通常工作得很好。唯一的属性集是
spring.cloud.stream.binder.consumer.concurrency=20
Run Code Online (Sandbox Code Playgroud)
主题本身有 20 个分区,应该适合。
在监控 kafka 的读取时,我们发现吞吐量非常低和奇怪的行为:
该应用程序一次最多读取 500 条消息,然后是 1-2 分钟的无内容。在此期间,消费者反复记录“缺少心跳,因为分区被重新平衡”,“重新分配分区”,有时甚至抛出异常说“提交失败,因为轮询间隔已过”
我们得出的结论是,这意味着消费者获取 500 条消息,需要很长时间来处理所有消息,错过了它的时间窗口,因此无法将 500 条消息中的任何一条提交给代理——代理重新分配分区并重新发送相同的消息再次。
在查看线程和文档后,我发现了“max.poll.records”属性,但在设置此属性的位置的建议中存在冲突。
有人说把它放在下面
spring.cloud.stream.bindings.consumer.<input>.configuration
Run Code Online (Sandbox Code Playgroud)
有人说
spring.cloud.stream.kafka.binders.consumer-properties
Run Code Online (Sandbox Code Playgroud)
我尝试将两者都设置为 1,但服务行为没有改变。
我如何正确处理这种情况,即消费者无法跟上默认设置所需的轮询间隔?
常见的yaml:
spring.cloud.stream.default.group=${spring.application.name}
Run Code Online (Sandbox Code Playgroud)
服务-yaml
spring:
clould:
stream:
default:
consumer.headerMode: embeddedHeaders
producer.headerMode: embeddedHeaders
bindings:
someOutput:
destination: outTopic
someInput:
destination: inTopic
consumer:
concurrency: 30
kafka:
bindings:
consumer:
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this …Run Code Online (Sandbox Code Playgroud) 我试图使用 spring 云流发送带有键值对的消息。我无法为此找到任何 API。org.springframework.messaging.MessageChannel 仅将有效负载作为发送功能的一部分。尽管使用 Kafka 模板可以实现这一点。这是生成键值类型消息的唯一方法。由于 KafkaTemplate 是 apache kafka 的 spring 的一部分,我希望在 spring 云流中有一个可用的抽象。请建议。
谢谢,
从 3.1 版开始,不推荐使用用于处理队列的主要 API。在课堂评论中它说:
从 3.1 开始弃用,支持函数式编程模型
我在网上搜索了很多解决方案,但没有找到关于我应该如何迁移的可靠 E2E 解释。
寻找以下示例:
如果有几种方法可以做到这一点(正如我在网上看到的那样),我很乐意为每个选项提供解释和典型用例。