我正在Spring Cloud Stream 3.1.2使用KafkaStreams. 编程模型是:
后一种使用注释,就像 Spring 提供的所有其他注释一样。但是,有提到的是
从 3.1.0 版本的 Binder 开始,我们建议对基于 Kafka Streams Binder 的应用程序使用上述函数式编程模型。从 Spring Cloud Stream 3.1.0 开始,不再支持 StreamListener。
因为我认为旧模型更具可读性(至少对我来说)。谁能解释为什么决定弃用它并支持函数式编程,并且它会被删除吗?
我创建了一个 spring-boot-2 gradle 项目,也在build.gradle文件中添加了 Kafka 相关依赖项,如下所示。
dependencies {
implementation 'org.springframework.cloud:spring-cloud-starter-zipkin'
compile 'org.springframework.cloud:spring-cloud-starter-bus-kafka'
}
Run Code Online (Sandbox Code Playgroud)
现在我想从文件中禁用所有与 Kafka 相关的自动配置,application.yaml
因为我已尝试在 yaml 文件中给出以下代码。
spring:
autoconfigure:
exclude:
- org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
Run Code Online (Sandbox Code Playgroud)
实现上述操作后,Kafka 仍然自动配置并开始将 Kafka 与应用程序集成。
我也尝试过下面的代码,但这对我来说也不起作用。
@SpringBootApplication
@EnableAutoConfiguration(exclude = KafkaAutoConfiguration.class)
public class ApiGatewayApplication {
public static void main(String[] args) {
SpringApplication.run(ApiGatewayApplication.class, args);
}
}
Run Code Online (Sandbox Code Playgroud)
现在请任何人帮助我,我如何从 yaml/properties 文件禁用与 kafka 相关的所有自动配置?
谢谢,
java apache-kafka spring-boot spring-cloud-stream spring-cloud-stream-binder-kafka
我正在使用 Spring Cloud Stream Kafka 绑定器来消费来自 Kafka 的消息。我能够使我的示例与单个 Kafka Binder 一起工作,如下所示
spring:
cloud:
stream:
kafka:
binder:
consumer-properties: {enable.auto.commit: true}
auto-create-topics: false
brokers: <broker url>
bindings:
consumer:
destination: some-topic
group: testconsumergroup
consumer:
concurrency: 1
valueSerde: JsonSerde
producer:
destination: some-other-topic
producer:
valueSerde: JsonSerde
Run Code Online (Sandbox Code Playgroud)
请注意,此处两个绑定都指向同一个 Kafka Broker。但是,我遇到了一种情况,我需要发布到某个 Kafka 集群中的一个主题,并且还需要从不同 Kafka 集群中的另一个主题进行消费。我应该如何更改配置才能绑定到不同的 Kafka 集群?
我尝试过这样的事情
spring:
cloud:
stream:
binders:
defaultbinder:
type: kafka
environment:
spring.cloud.stream.kafka.streams.binder.brokers: <cluster1-brokers>
kafka1:
type: kafka
environment:
spring.cloud.stream.kafka.streams.binder.brokers: <cluster2-brokers>
bindings:
consumer:
binder: kafka1
destination: some-topic
group: testconsumergroup
consumer:
concurrency: 1
valueSerde: JsonSerde …Run Code Online (Sandbox Code Playgroud) spring-boot spring-cloud spring-cloud-stream spring-kafka spring-cloud-stream-binder-kafka
我认为它与以下链接有关,但我不明白。
可以为 kafka 流内部主题(如 *-changelog 主题)提供“retention.ms”、“cleanup.policy”等主题配置,以删除无用日志。
但是,当涉及到 *-repartition 主题等内部主题时,无法提供主题配置值,即使重新分区主题的默认“retention.ms”是“-1”(这意味着无限保留)。如何删除或管理重新分区主题?否则,重新分区主题的大小将太大,并且可能会出现磁盘故障问题。
如何管理重新分区主题?什么是清除数据?在文档中找不到任何相关解释。
apache-kafka apache-kafka-streams spring-cloud-stream-binder-kafka
我的 Scs 应用程序有两个具有此配置的 Kafka 生产者:
spring:
cloud:
function:
definition: myProducer1;myProducer2
stream:
bindings:
myproducer1-out-0:
destination: topic1
producer:
useNativeEncoding: true
myproducer2-out-0:
destination: topic2
producer:
useNativeEncoding: true
kafka:
binder:
brokers: ${kafka.brokers:localhost}
min-partition-count: 3
replication-factor: 3
producerProperties:
enable:
idempotence: false
retries: 10000
acks: all
key:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
schema:
registry:
url: ${schema-registry.url:http://localhost:8081}
Run Code Online (Sandbox Code Playgroud)
它在大约 10 秒后开始:
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8084
e.p.i.m.MyAppApplicationKt : Started MyAppApplicationKt in …Run Code Online (Sandbox Code Playgroud) apache-kafka spring-cloud-stream apache-kafka-streams spring-cloud-stream-binder-kafka
I have a spring boot application which has two functionalities Http requests and kafka Messages handling. I want this application to run in mode which is enabled from application.yml i.e if the user wants to enable it only for http requests then kafka should not be connected.
I could achieve this using normal spring boot kafka plugin by disabling auto configure using the following property at @KafkaListener,
autoStartup="${module.put:false}"
现在我们正在尝试转向云流,我发现通过删除云流和活页夹库来禁用它的唯一方法。有没有更好的方法使用自动配置模式的属性来禁用它,或者是否有任何手动配置选项可用?
dynamic-loading apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka
我有一个使用 Kafka binder 的 Spring Cloud Stream 项目,我正在尝试理解并最终自定义 Cloud Stream 使用的 RetryTemplate。
我没有找到很多关于它是如何工作的文档,但是我所阅读的内容使我得出以下假设:
@StreamListener都会触发 Spring Retry这些假设正确吗?
现在,在我的应用程序中,我有一个模式,其中一些消息可以立即处理,但其他消息必须推迟到以后再次尝试(使用指数退避等)。
我应该抛出异常导致 Spring Cloud Stream 在绑定层重试这些消息,还是自己实现重试并跟踪我自己的重试上下文?
如果我应该依赖 Cloud Stream 的重试设置,我应该如何自定义退避策略等?
spring-retry spring-cloud-stream spring-cloud-stream-binder-kafka
我们有多个应用程序消费者监听同一个 kafka 主题,并且生产者在向主题发送消息时设置消息标头,以便特定实例可以评估标头并处理消息。例如
@StreamListener(target=ITestSink.CHANNEL_NAME,condition="headers['franchiseName'] == 'sydney'")
public void fullfillOrder(@Payload TestObj message) {
log.info("sydney order request received message is {}",message.getName());
}
Run Code Online (Sandbox Code Playgroud)
在 Spring Cloud Stream 3.0.0 中,@StreamListener 已被弃用,我无法在 Function 中找到条件属性的等效项。
有什么建议吗?
java spring-boot spring-cloud-stream apache-kafka-streams spring-cloud-stream-binder-kafka
我正在使用 Spring Boot 2.7.0 和 Spring Cloud Microservices 堆栈,我尝试通过 kafka 发送通知并收到以下错误 -
错误 -
2022-06-12 13:18:51.114 INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Instantiated an idempotent producer.
2022-06-12 13:18:51.179 INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.1.1
2022-06-12 13:18:51.180 INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 97671528ba54a138
2022-06-12 13:18:51.181 INFO [order-service,472b506d2515980c,472b506d2515980c] 21889 --- [onPool-worker-3] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1655020131179
2022-06-12 13:18:51.221 INFO [order-service,,] 21889 --- [ad | producer-2] org.apache.kafka.clients.Metadata : [Producer clientId=producer-2] Cluster ID: …Run Code Online (Sandbox Code Playgroud) java apache-kafka spring-kafka spring-cloud-stream-binder-kafka
我有一个基于 Spring 云流的 Kafka Streams 应用程序,我将全局 KTable 绑定到紧凑主题。当我将墓碑记录推送到主题(具有空值的非空键)时,我的 Kafka 流应用程序因反序列化异常而失败。失败是因为我的反序列化器不处理空记录。
从文档中,我认为 GlobalKTable 甚至不会“看到”空值记录。难道不是这样吗?我需要在反序列化器中处理空记录吗?
org.apache.kafka.common.errors.SerializationException: Unable to deserialize
Caused by: java.lang.IllegalArgumentException: argument "src" is null
at com.fasterxml.jackson.databind.ObjectMapper._assertNotNull(ObjectMapper.java:4693)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3511)
at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:47)
at common.domain.serdes.MySerde$MyDeserializer.deserialize(MySerde.java:39)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:91)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:240)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:289)Run Code Online (Sandbox Code Playgroud)
java spring-cloud-stream apache-kafka-streams spring-cloud-stream-binder-kafka
spring-cloud-stream-binder-kafka ×10
apache-kafka ×5
java ×4
spring-boot ×3
spring-kafka ×2
spring ×1
spring-cloud ×1
spring-retry ×1