标签: spring-cloud-stream-binder-kafka

为什么 StreamListener 被弃用

我正在Spring Cloud Stream 3.1.2使用KafkaStreams. 编程模型是:

  • 函数式编程
  • 命令式编程

后一种使用注释,就像 Spring 提供的所有其他注释一样。但是,有提到的是

从 3.1.0 版本的 Binder 开始,我们建议对基于 Kafka Streams Binder 的应用程序使用上述函数式编程模型。从 Spring Cloud Stream 3.1.0 开始,不再支持 StreamListener。

因为我认为旧模型更具可读性(至少对我来说)。谁能解释为什么决定弃用它并支持函数式编程,并且它会被删除吗?

spring spring-cloud-stream spring-cloud-stream-binder-kafka

7
推荐指数
1
解决办法
1万
查看次数

如何在 spring-boot-2 中的 yaml/properties 文件中禁用所有与 Kafka 相关的自动配置而不删除依赖项?

我创建了一个 spring-boot-2 gradle 项目,也在build.gradle文件中添加了 Kafka 相关依赖项,如下所示。

dependencies {
    implementation 'org.springframework.cloud:spring-cloud-starter-zipkin'
    compile 'org.springframework.cloud:spring-cloud-starter-bus-kafka'
}
Run Code Online (Sandbox Code Playgroud)

现在我想从文件中禁用所有与 Kafka 相关的自动配置,application.yaml 因为我已尝试在 yaml 文件中给出以下代码。

spring:
  autoconfigure:
      exclude:
        - org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
Run Code Online (Sandbox Code Playgroud)

实现上述操作后,Kafka 仍然自动配置并开始将 Kafka 与应用程序集成。

我也尝试过下面的代码,但这对我来说也不起作用。

@SpringBootApplication
@EnableAutoConfiguration(exclude = KafkaAutoConfiguration.class)
public class ApiGatewayApplication {

    public static void main(String[] args) {
        SpringApplication.run(ApiGatewayApplication.class, args);
    }

}
Run Code Online (Sandbox Code Playgroud)

现在请任何人帮助我,我如何从 yaml/properties 文件禁用与 kafka 相关的所有自动配置?

谢谢,

java apache-kafka spring-boot spring-cloud-stream spring-cloud-stream-binder-kafka

6
推荐指数
1
解决办法
1万
查看次数

Spring Cloud Stream Kafka 多重绑定

我正在使用 Spring Cloud Stream Kafka 绑定器来消费来自 Kafka 的消息。我能够使我的示例与单个 Kafka Binder 一起工作,如下所示

spring:
  cloud:
    stream:
      kafka:
        binder:
          consumer-properties: {enable.auto.commit: true}
          auto-create-topics: false
          brokers: <broker url>
      bindings:
        consumer:
          destination: some-topic
          group: testconsumergroup
          consumer:
            concurrency: 1
            valueSerde: JsonSerde
        producer:
          destination: some-other-topic
          producer:
            valueSerde: JsonSerde
Run Code Online (Sandbox Code Playgroud)

请注意,此处两个绑定都指向同一个 Kafka Broker。但是,我遇到了一种情况,我需要发布到某个 Kafka 集群中的一个主题,并且还需要从不同 Kafka 集群中的另一个主题进行消费。我应该如何更改配置才能绑定到不同的 Kafka 集群?

我尝试过这样的事情

spring:
  cloud:
    stream:
      binders:
        defaultbinder:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster1-brokers>
        kafka1:
          type: kafka
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: <cluster2-brokers>
      bindings:
        consumer:
          binder: kafka1
          destination: some-topic
          group: testconsumergroup
          consumer:
            concurrency: 1
            valueSerde: JsonSerde …
Run Code Online (Sandbox Code Playgroud)

spring-boot spring-cloud spring-cloud-stream spring-kafka spring-cloud-stream-binder-kafka

6
推荐指数
0
解决办法
9266
查看次数

为什么Kaka Streams重新分区主题的retention.ms默认设置为-1?这不是无限保留repartition topic中的消息吗?

我认为它与以下链接有关,但我不明白。

可以为 kafka 流内部主题(如 *-changelog 主题)提供“retention.ms”、“cleanup.policy”等主题配置,以删除无用日志。

但是,当涉及到 *-repartition 主题等内部主题时,无法提供主题配置值,即使重新分区主题的默认“retention.ms”是“-1”(这意味着无限保留)。如何删除或管理重新分区主题?否则,重新分区主题的大小将太大,并且可能会出现磁盘故障问题。

如何管理重新分区主题?什么是清除数据?在文档中找不到任何相关解释。

apache-kafka apache-kafka-streams spring-cloud-stream-binder-kafka

6
推荐指数
1
解决办法
1578
查看次数

使用 enable.idempotence true 时,Spring Cloud Stream Kafka 应用程序的启动速度极慢

我的 Scs 应用程序有两个具有此配置的 Kafka 生产者:

spring:
  cloud:
    function:
      definition: myProducer1;myProducer2
    stream:
      bindings:
        myproducer1-out-0:
          destination: topic1
          producer:
            useNativeEncoding: true
        myproducer2-out-0:
          destination: topic2
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: ${kafka.brokers:localhost}
          min-partition-count: 3
          replication-factor: 3
          producerProperties:
            enable:
              idempotence: false
            retries: 10000
            acks: all
            key:
              serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
              subject:
                name:
                  strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
            value:
              serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
              subject:
                name:
                  strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
            schema:
              registry:
                url: ${schema-registry.url:http://localhost:8081}
Run Code Online (Sandbox Code Playgroud)

它在大约 10 秒后开始:

 o.s.c.s.m.DirectWithAttributesChannel    : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
 o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8084
 e.p.i.m.MyAppApplicationKt     : Started MyAppApplicationKt in …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-cloud-stream apache-kafka-streams spring-cloud-stream-binder-kafka

5
推荐指数
1
解决办法
804
查看次数

Configuration which allows to disable cloud stream?

I have a spring boot application which has two functionalities Http requests and kafka Messages handling. I want this application to run in mode which is enabled from application.yml i.e if the user wants to enable it only for http requests then kafka should not be connected.

I could achieve this using normal spring boot kafka plugin by disabling auto configure using the following property at @KafkaListener,

autoStartup="${module.put:false}"

现在我们正在尝试转向云流,我发现通过删除云流和活页夹库来禁用它的唯一方法。有没有更好的方法使用自动配置模式的属性来禁用它,或者是否有任何手动配置选项可用?

dynamic-loading apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka

5
推荐指数
2
解决办法
4190
查看次数

理解 Spring Cloud Stream Kafka 和 Spring Retry

我有一个使用 Kafka binder 的 Spring Cloud Stream 项目,我正在尝试理解并最终自定义 Cloud Stream 使用的 RetryTemplate。

我没有找到很多关于它是如何工作的文档,但是我所阅读的内容使我得出以下假设:

  • Cloud Stream 默认配置并启用 Spring Retry,包括默认的重试和退避策略。
  • 默认情况下, a 中任何未捕获的异常@StreamListener都会触发 Spring Retry
  • Cloud Stream 会以某种方式跟踪每条消息的 RetryContext 信息(如何?我不确定)

这些假设正确吗?

现在,在我的应用程序中,我有一个模式,其中一些消息可以立即处理,但其他消息必须推迟到以后再次尝试(使用指数退避等)。

我应该抛出异常导致 Spring Cloud Stream 在绑定层重试这些消息,还是自己实现重试并跟踪我自己的重试上下文?

如果我应该依赖 Cloud Stream 的重试设置,我应该如何自定义退避策略等?

spring-retry spring-cloud-stream spring-cloud-stream-binder-kafka

5
推荐指数
1
解决办法
2679
查看次数

Spring Cloud @StreamListener 条件已弃用,替代方案是什么

我们有多个应用程序消费者监听同一个 kafka 主题,并且生产者在向主题发送消息时设置消息标头,以便特定实例可以评估标头并处理消息。例如

@StreamListener(target=ITestSink.CHANNEL_NAME,condition="headers['franchiseName'] == 'sydney'")
public void fullfillOrder(@Payload TestObj message) {
    log.info("sydney order request received message is {}",message.getName());
}
Run Code Online (Sandbox Code Playgroud)

在 Spring Cloud Stream 3.0.0 中,@StreamListener 已被弃用,我无法在 Function 中找到条件属性的等效项。

有什么建议吗?

java spring-boot spring-cloud-stream apache-kafka-streams spring-cloud-stream-binder-kafka

5
推荐指数
1
解决办法
5483
查看次数

java.lang.ClassCastException:类java.lang.String无法转换为类[B(java.lang.String和[B位于加载程序'bootstrap的java.base模块中)

我正在使用 Spring Boot 2.7.0 和 Spring Cloud Microservices 堆栈,我尝试通过 kafka 发送通知并收到以下错误 -

错误 -

2022-06-12 13:18:51.114  INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2] Instantiated an idempotent producer.
2022-06-12 13:18:51.179  INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.1
2022-06-12 13:18:51.180  INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 97671528ba54a138
2022-06-12 13:18:51.181  INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1655020131179
2022-06-12 13:18:51.221  INFO [order-service,,] 21889 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Cluster ID: …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-kafka spring-cloud-stream-binder-kafka

5
推荐指数
1
解决办法
3万
查看次数

Kafka 流 GlobalKTable 在 Tombstone - null 值 - 记录上抛出反序列化异常

我有一个基于 Spring 云流的 Kafka Streams 应用程序,我将全局 KTable 绑定到紧凑主题。当我将墓碑记录推送到主题(具有空值的非空键)时,我的 Kafka 流应用程序因反序列化异常而失败。失败是因为我的反序列化器不处理空记录。

从文档中,我认为 GlobalKTable 甚至不会“看到”空值记录。难道不是这样吗?我需要在反序列化器中处理空记录吗?

org.apache.kafka.common.errors.SerializationException: Unable to deserialize
Caused by: java.lang.IllegalArgumentException: argument "src" is null
    at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4693)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3511)
    at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:47)
    at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:39)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:91)
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:240)
    at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:289)
Run Code Online (Sandbox Code Playgroud)

java spring-cloud-stream apache-kafka-streams spring-cloud-stream-binder-kafka

4
推荐指数
1
解决办法
852
查看次数