这是我的应用程序属性的一部分:
spring.cloud.stream.rabbit.bindings.studentInput.consumer.exchange-type=direct spring.cloud.stream.rabbit.bindings.studentInput.consumer.delayed-exchange=true
但似乎在 RabbitMQ 管理页面中,它在我的队列功能的 Args 中没有x-delayed-type: direct 。我引用了这个Spring Cloud Stream文档:https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/
我究竟做错了什么?先谢谢了
我有一个 Spring Cloud Stream Kafka 消费者服务,其中确认是手动完成的。提供固定的消费群体。
spring.cloud.stream.bindings.input.group=sampleconsumergroup
resetoffsets 和 startOffset 属性设置如下 spring.cloud.stream.kafka.bindings.input.consumer.resetOffsets=true spring.cloud.stream.kafka.bindings.input.consumer.startOffset=latest
消费者服务监听主题模式。
场景:消费者服务关闭,在此期间有一些消息发送到其主题。当消费者服务恢复时,这些消息不会被消费。只有重新上线后发送的消息才会被消耗。
这符合预期吗?我正处于卡夫卡学习阶段,非常感谢您的解释。
我有一个简单的 Spring 集成流程,它绑定到处理器以进行输入和输出。
我已经配置了 Kafka 绑定器来映射输入和输出主题。效果很好。
假设我想将 3 个 Kafka 主题绑定到输入,从而导致 3 个已配置的消费者从三个单独的 Kafka 主题中提取数据,然后由我的 SI 流进行处理。
是否可以将多个 Kafka 主题映射到我的处理器的输入?如果是这样,该配置会是什么样子?
spring-boot spring-cloud-stream spring-cloud-stream-binder-kafka
我需要一些帮助来了解如何使用 Spring boot、Kafka、Resilence4J 提出解决方案,以实现来自 Kafka Consumer 的微服务调用。假设如果微服务关闭,那么我需要使用断路器模式通知我的 Kafka 消费者停止获取消息/事件,直到微服务启动并运行。
Apache Pulsar 没有适用于 Spring Cloud Stream 的官方库。我找到了Spring for Apache Pulsar,但它仅与 Spring Boot 有关。在它的 GitHub 存储库中,有一个 [spring-pulsar-spring-cloud-stream-binder],但在 mvnrepository 中找不到。那么我该如何使用它呢?
有没有在 Spring Cloud Stream 中集成 Apache Pulsar 的好方法?
我试图用来spring.cloud.stream.kafka.binder.headers传输基于上一个问题设置的自定义标头。
我已经在文档中阅读了...
spring.cloud.stream.kafka.binder.headers
The list of custom headers that will be transported by the binder.
Default: empty.
Run Code Online (Sandbox Code Playgroud)
似乎建议您设置一个列表(用逗号分隔?)会导致自定义标头在中传输Message<>,但一旦kafka写操作完成,标头就会丢失。
我的注释创建了标头,作为对MessagingGateway的调用的一部分:
@MessagingGateway(name = "redemptionGateway", defaultRequestChannel = Channels.GATEWAY_OUTPUT, defaultHeaders = @GatewayHeader(name = "orderId", expression = "#gatewayMethod.name"))
public interface RedemptionGateway {
...
}
Run Code Online (Sandbox Code Playgroud)
我观察到头是在第一次preSend调试中正确创建的:
2016-08-15 15:09:04 http-nio-8080-exec-2 DEBUG DirectChannel:430 - preSend on channel 'gatewayOutput', message: GenericMessage [payload=x.TrivialRedemption@2d052d2a[orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={orderId=create, id=5dccea6f-266e-82b9-54c6-57ec441a26ac, timestamp=1471288144882}] - {applicationSystemCode=x, clientIP=0:0:0:0:0:0:0:1, clusterId=Cluster-Id-NA, containerId=Container-Id-NA, correlationId=UNDEFINED, domainName=defaultDomain, hostName=Host-NA, messageId=10.113.21.144-eb8404d0-de93-4f94-80cb-e5b638e8aeef, userId=anonymous, webAnalyticsCorrelationId=|}
Run Code Online (Sandbox Code Playgroud)
但是在下一次preSend发送时,标头丢失了:
2016-08-15 15:09:05 kafka-binder- DEBUG …Run Code Online (Sandbox Code Playgroud) 我正在测试spring-cloud-starter-stream-kafka。出现以下错误。
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:43)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171)
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4$1.doWithRetry(KafkaMessageChannelBinder.java:607)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.onMessage(KafkaMessageChannelBinder.java:604)
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221)
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209)
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67)
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at …Run Code Online (Sandbox Code Playgroud) 使用spring cloud stream,遇到以下问题:
bean类[org.springframework.boot.web.support.ErrorPageFilter]的注释指定bean名称'errorPageFilter'与同名和类的现有非兼容bean定义冲突[org.springframework.boot.context.web.ErrorPageFilter ]
我的春云蒸汽依赖性是由判断
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>Brooklyn.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Run Code Online (Sandbox Code Playgroud) 是的,我已经阅读了所有找到的文档,并尝试了所有其他配置方法,但是仅此简单的示例(该行应该记录一行)不起作用
(这是一个带有spring-cloud-stream-binder-kafka-streams的Spring-Boot-2应用程序)
Kafka正在存储一个字符串值(空键)
我的aplilication.yaml
spring:
cloud:
stream:
bindings:
input:
destination: 'myStreamTopic'
output:
producer.keySerde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
kafka:
streams:
binder:
configuration:
default.key.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
default.value.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
brokers:
- 'ommited:9092'
- 'ommited:9092'
- 'ommited:9092'
application-id: hack1
Run Code Online (Sandbox Code Playgroud)
只需将以下简单代码作为POC:
@SpringBootApplication
@Slf4j
public class HackatonApplication {
public static void main(String[] args) {
SpringApplication.run(HackatonApplication.class, args);
}
@EnableBinding(KafkaStreamsProcessor.class)
public static class LineProcessor {
@StreamListener(Sink.INPUT)
public void process(KStream<?, String> line) {
log.info("Received: {}", line);
}
}
Run Code Online (Sandbox Code Playgroud)
我无法使其运行!
org.springframework.context.ApplicationContextException:无法启动bean'outputBindingLifecycle';嵌套异常是java.lang.IllegalArgumentException:尝试调用公共抽象org.apache.kafka.streams.kstream.KStream org.apache.kafka.streams.kstream.KStream.map(org.apache.kafka.streams.kstream.KeyValueMapper ),但尚未设置任何代表。在org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:184)〜[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]在org.springframework.context.support处。 DefaultLifecycleProcessor.access $ 200(DefaultLifecycleProcessor.java:52)〜[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]在
抱歉,如果这很琐碎,但我已经花了几个小时进行搜索,谷歌搜索并试图找到有据可查的解决方案。
是否可以使用spring.cloud.function具有多个独立函数/绑定的函数式( ) 样式的反应式 SCS 应用程序?我发现的所有示例总是只注册一个具有默认绑定的功能性 bean input, output。我想注册多个,每个都有自己的绑定。
传统上可以使用spring-cloud-stream-reactive它来完成,但现在不赞成使用功能支持。
spring-boot ×4
apache-kafka ×2
java ×2
rabbitmq ×1
reactive ×1
resilience4j ×1
spring ×1