Spring允许拦截许多产品的消息,比如RestTemplate和SpringMVC.是否可以拦截Spring Cloud Stream消息?对于传入和传出消息.
我有以下基于 Spring Cloud Stream 的应用程序的测试场景。我的应用程序有一个主题,有两个队列。第一个队列的 BindingKey 被命名为“城市”,第二个队列的绑定密钥是“人”。
请问如何为Spring Cloud Stream Rabbit生产者设置路由键???区分消息将在哪里消费?
这是我的绑定配置:
spring.config.name=streaming
spring.cloud.stream.bindings.citiesChannel.destination=streamInput
spring.cloud.stream.bindings.citiesChannel.group=cities
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities
spring.cloud.stream.bindings.personsChannel.destination=streamInput
spring.cloud.stream.bindings.personsChannel.group=persons
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons
spring.cloud.stream.bindings.producingChannel.destination=streamInput
Run Code Online (Sandbox Code Playgroud)
发布到生产频道时,如何区分消息将发送到哪里(城市或人员队列)的唯一方法是通过“spring.cloud.stream.bindings.productionChannel.producer.requiredGroups”属性,但这是非常不可用的。因为我不想知道我的消息将要到达的队列的任何信息......这是 AMPQ 反模式。
我不想要更简单的东西,然后在发布到生产频道时通过RabbitTemplate.setRoutingKey(String routingKey)方法拥有类似的功能...:-(
Spring Cloud Stream启动器kafka在连接消费者时不会加载配置。以下是我在调试模式下运行时在控制台中看到的配置:
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
Run Code Online (Sandbox Code Playgroud)
我有 bootstrap yml 文件的以下配置部分
spring:
cloud:
stream:
bindings:
<binding configuration>
kafka:
binder:
autoCreateTopics: false
brokers: <list of kafka brokers>
defaultBrokerPort: <default …Run Code Online (Sandbox Code Playgroud) 我正在创建从KStream处理数据的KTable。但是,当我使用键和有效负载为空的逻辑删除消息时,它并没有从KTable中删除消息。
样本-
public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
.map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
.groupByKey()
reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));
GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);
Run Code Online (Sandbox Code Playgroud)
触发带有空值的消息后,我可以使用有效负载为空的testStream映射函数进行调试,但是它不会删除KTable更改日志“测试存储”上的记录。看起来它甚至没有达到reduce方法,不确定我在这里缺少什么。
感谢任何帮助!
谢谢。
我试图从kafka读取一条json消息并得到一个例外,其中说Jackson不能将json反序列化为POJO.
json就像{"code":"500","count":22,"from":1528343820000,"to":1528343880000}是kafka流的输出.
POJO声明了json的所有属性,并且与生成json消息的POJO完全相同.所以我不知道为什么会这样.
我正在使用spring cloud stream 2.0.0.RELEASE.
任何帮助,将不胜感激.谢谢.
POJO:
public class CodeCount {
private String code;
private long count;
private Date from;
private Date to;
@Override
public String toString() {
return "CodeCount [code=" + code + ", count=" + count + ", from=" + from + ", to=" + to + "]";
}
public CodeCount(String code, long count, Date from, Date to) {
super();
this.code = code;
this.count = count;
this.from = from;
this.to = to;
}
public …Run Code Online (Sandbox Code Playgroud) 我想使用 kafka 在 Spring Cloud Stream 中管理 DLQ。
应用程序.yaml
server:
port: 8091
eureka:
client:
serviceUrl:
defaultZone: http://IP:8761/eureka
spring:
application:
name: employee-consumer
cloud:
stream:
kafka:
binder:
brokers: IP:9092
bindings:
greetings-in:
destination: greetings
contentType: application/json
greetings-out:
destination: greetings
contentType: application/json
bindings:
greetings-out:
consumer:
enableDlq: true
dlqName: dead-out
kafka:
consumer:
group-id: A
Run Code Online (Sandbox Code Playgroud)
正如您在我的配置中看到的,我启用了 dlq 并为 dlq 主题设置了一个名称。
为了测试 DLQ 行为,我对某些消息抛出异常
我的监听器组件
@StreamListener("greetings-out")
public void handleGreetingsInput(@Payload Greetings greetings) throws Exception {
logger.info("Greetings input -> {}", greetings);
if (greetings.getMessage().equals("ciao")) {
throw new Exception("eer");
}
} …Run Code Online (Sandbox Code Playgroud) 我们的代码是完全反应式的,使用 Webflux。我们想将消息发送到 Kafka 队列,并希望以被动方式进行。
我看到有一个 spring-cloud-stream-reactive 依赖项,它支持 Reactive 功能,但对于它的数据流的特定用例。
Spring Cloud Stream 还支持使用反应式 API,其中传入和传出数据作为连续数据流处理。
我知道这里有一种方法可以做到:https : //projectreactor.io/docs/kafka/release/reference/
但它看起来与 kafka 特定的库紧密耦合。我喜欢这里的松耦合。
有没有办法以我的代码不耦合到 kafka 库的方式来做到这一点?
reactive-programming project-reactor spring-cloud-stream spring-webflux
有没有关于如何在 Spring Cloud Stream 应用程序中使用 AWS MSK 详细信息的信息?
我相信我们需要生成一个密钥库和信任库,然后将它们合并到我们的应用程序中?我浏览了 AWS MSK 的“客户端身份验证”页面,发现这非常令人困惑。
任何人都可以帮我解决这个问题吗?我只是想部署这个使用 AWS MSK(3 个代理)的应用程序。
谢谢你。
java amazon-web-services spring-cloud-stream spring-kafka aws-msk
我看到 Spring Cloud Stream 的以下注释已折旧
@Input
@Output
@EnableBinding
@StreamListener
请提供示例和文档链接,了解如何以功能方式进行操作。
我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面阅读了一些关于它的好东西,因此开发人员可以主要关注事物的业务逻辑方面。
在这里,我有我的简单应用程序类。
package com.some.events.consumer
import com.some.events.SomeEvent
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import java.util.function.Consumer
@SpringBootApplication
class ConsumerApplication {
@Bean
fun consume(): Consumer<KStream<String, SomeEvent>> {
return Consumer { input -> input.foreach { key, value -> println("Key: $key, value: $value") } }
}
}
fun main(args: Array<String>) {
runApplication<ConsumerApplication>(*args)
}
Run Code Online (Sandbox Code Playgroud)
我的application.yml文件如下。
spring:
cloud:
function:
definition: consume
stream:
bindings:
consume-in-0:
destination: "some-event"
group: "some-event"
Run Code Online (Sandbox Code Playgroud)
我的依赖项build.gradle.kts定义如下(这里只包含相关的)。
extra["springCloudVersion"] = "2020.0.2"
dependencies { …Run Code Online (Sandbox Code Playgroud) spring apache-kafka spring-boot spring-cloud-stream apache-kafka-streams
apache-kafka ×2
java ×2
spring ×2
spring-boot ×2
spring-cloud ×2
spring-kafka ×2
aws-msk ×1
jackson ×1