标签: spring-cloud-stream

如何拦截Spring Cloud Stream消息?

Spring允许拦截许多产品的消息,比如RestTemplate和SpringMVC.是否可以拦截Spring Cloud Stream消息?对于传入和传出消息.

java spring-cloud spring-cloud-stream

3
推荐指数
2
解决办法
2766
查看次数

如何为生产者设置路由键

我有以下基于 Spring Cloud Stream 的应用程序的测试场景。我的应用程序有一个主题,有两个队列。第一个队列的 BindingKey 被命名为“城市”,第二个队列的绑定密钥是“人”。

请问如何为Spring Cloud Stream Rabbit生产者设置路由键???区分消息将在哪里消费?

这是我的绑定配置:

spring.config.name=streaming

spring.cloud.stream.bindings.citiesChannel.destination=streamInput
spring.cloud.stream.bindings.citiesChannel.group=cities
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities

spring.cloud.stream.bindings.personsChannel.destination=streamInput
spring.cloud.stream.bindings.personsChannel.group=persons
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons

spring.cloud.stream.bindings.producingChannel.destination=streamInput
Run Code Online (Sandbox Code Playgroud)

发布到生产频道时,如何区分消息将发送到哪里(城市或人员队列)的唯一方法是通过“spring.cloud.stream.bindings.productionChannel.producer.requiredGroups”属性,但这是非常不可用的。因为我不想知道我的消息将要到达的队列的任何信息......这是 AMPQ 反模式。

我不想要更简单的东西,然后在发布到生产频道时通过RabbitTemplate.setRoutingKey(String routingKey)方法拥有类似的功能...:-(

spring-cloud spring-cloud-stream

3
推荐指数
1
解决办法
7130
查看次数

spring cloud Stream启动流kafka消费者SSL配置

Spring Cloud Stream启动器kafka在连接消费者时不会加载配置。以下是我在调试模式下运行时在控制台中看到的配置:

security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
Run Code Online (Sandbox Code Playgroud)

我有 bootstrap yml 文件的以下配置部分

    spring:
     cloud:
      stream:
       bindings:
         <binding configuration>
       kafka:
        binder:
         autoCreateTopics: false
         brokers: <list of kafka brokers>
         defaultBrokerPort: <default …
Run Code Online (Sandbox Code Playgroud)

spring-cloud-stream

3
推荐指数
1
解决办法
7113
查看次数

逻辑删除消息是否无法从KTable状态存储中删除记录?

我正在创建从KStream处理数据的KTable。但是,当我使用键和有效负载为空的逻辑删除消息时,它并没有从KTable中删除消息。

样本-

public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
                .map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
                .groupByKey()
                reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));


GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);
Run Code Online (Sandbox Code Playgroud)

触发带有空值的消息后,我可以使用有效负载为空的testStream映射函数进行调试,但是它不会删除KTable更改日志“测试存储”上的记录。看起来它甚至没有达到reduce方法,不确定我在这里缺少什么。

感谢任何帮助!

谢谢。

spring-cloud-stream apache-kafka-streams spring-kafka

3
推荐指数
1
解决办法
1510
查看次数

杰克逊无法反序列化(春云流卡夫卡)

我试图从kafka读取一条json消息并得到一个例外,其中说Jackson不能将json反序列化为POJO.

json就像{"code":"500","count":22,"from":1528343820000,"to":1528343880000}是kafka流的输出.

POJO声明了json的所有属性,并且与生成json消息的POJO完全相同.所以我不知道为什么会这样.

我正在使用spring cloud stream 2.0.0.RELEASE.

任何帮助,将不胜感激.谢谢.

POJO:

public class CodeCount {
    private String code;
    private long count;
    private Date from;
    private Date to;

    @Override
    public String toString() {
        return "CodeCount [code=" + code + ", count=" + count + ", from=" + from + ", to=" + to + "]";
    }

    public CodeCount(String code, long count, Date from, Date to) {
        super();
        this.code = code;
        this.count = count;
        this.from = from;
        this.to = to;
    }

    public …
Run Code Online (Sandbox Code Playgroud)

jackson spring-cloud-stream

3
推荐指数
1
解决办法
3653
查看次数

在 Spring Cloud Stream Kafka 中正确管理 DLQ

我想使用 kafka 在 Spring Cloud Stream 中管理 DLQ。

应用程序.yaml

server:
    port: 8091
eureka:
    client:
        serviceUrl:
            defaultZone: http://IP:8761/eureka
spring:
    application:
        name: employee-consumer
    cloud:
        stream:
            kafka:
                binder:
                    brokers: IP:9092
                bindings:
                    greetings-in:
                        destination: greetings
                        contentType: application/json
                    greetings-out:
                        destination: greetings
                        contentType: application/json
            bindings:
                greetings-out:
                    consumer:
                        enableDlq: true
                        dlqName: dead-out
    kafka:
      consumer:
        group-id: A
Run Code Online (Sandbox Code Playgroud)

正如您在我的配置中看到的,我启用了 dlq 并为 dlq 主题设置了一个名称。

为了测试 DLQ 行为,我对某些消息抛出异常

我的监听器组件

@StreamListener("greetings-out")
    public void handleGreetingsInput(@Payload Greetings greetings) throws Exception {
        logger.info("Greetings input -> {}", greetings);
        if (greetings.getMessage().equals("ciao")) {
            throw new Exception("eer");
        }
    } …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-boot spring-cloud-stream

3
推荐指数
1
解决办法
7208
查看次数

如何以非常通用的方式被动地将有效负载发送到 Kafka 主题?

我们的代码是完全反应式的,使用 Webflux。我们想将消息发送到 Kafka 队列,并希望以被动方式进行。

我看到有一个 spring-cloud-stream-reactive 依赖项,它支持 Reactive 功能,但对于它的数据流的特定用例。

Spring Cloud Stream 还支持使用反应式 API,其中传入和传出数据作为连续数据流处理。

我知道这里有一种方法可以做到:https : //projectreactor.io/docs/kafka/release/reference/

但它看起来与 kafka 特定的库紧密耦合。我喜欢这里的松耦合。

有没有办法以我的代码不耦合到 kafka 库的方式来做到这一点?

reactive-programming project-reactor spring-cloud-stream spring-webflux

3
推荐指数
1
解决办法
173
查看次数

AWS Kafka (MSK) - 如何生成密钥库和信任库并在我的 Spring Cloud Stream 应用程序中使用它们?

有没有关于如何在 Spring Cloud Stream 应用程序中使用 AWS MSK 详细信息的信息?

我相信我们需要生成一个密钥库和信任库,然后将它们合并到我们的应用程序中?我浏览了 AWS MSK 的“客户端身份验证”页面,发现这非常令人困惑。

任何人都可以帮我解决这个问题吗?我只是想部署这个使用 AWS MSK(3 个代理)的应用程序。

谢谢你。

java amazon-web-services spring-cloud-stream spring-kafka aws-msk

3
推荐指数
1
解决办法
1631
查看次数

@EnableBinding @deprecated 自 3.1 起支持函数式编程模型

我看到 Spring Cloud Stream 的以下注释已折旧

@Input @Output @EnableBinding @StreamListener

请提供示例和文档链接,了解如何以功能方式进行操作。

spring spring-cloud-stream

3
推荐指数
2
解决办法
3476
查看次数

Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:'listener'不能为空

我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面阅读了一些关于它的好东西,因此开发人员可以主要关注事物的业务逻辑方面。

在这里,我有我的简单应用程序类。

package com.some.events.consumer

import com.some.events.SomeEvent
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import java.util.function.Consumer

@SpringBootApplication
class ConsumerApplication {
    @Bean
    fun consume(): Consumer<KStream<String, SomeEvent>> {
        return Consumer { input -> input.foreach { key, value -> println("Key: $key, value: $value") } }
    }
}

fun main(args: Array<String>) {
    runApplication<ConsumerApplication>(*args)
}

Run Code Online (Sandbox Code Playgroud)

我的application.yml文件如下。

spring:
  cloud:
    function:
      definition: consume
    stream:
      bindings:
        consume-in-0:
          destination: "some-event"
          group: "some-event"
Run Code Online (Sandbox Code Playgroud)

我的依赖项build.gradle.kts定义如下(这里只包含相关的)。

extra["springCloudVersion"] = "2020.0.2"

dependencies { …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka spring-boot spring-cloud-stream apache-kafka-streams

3
推荐指数
1
解决办法
1209
查看次数