标签: spring-cloud-stream

如何使用 Spring Cloud Stream 在 RabbitMQ 的队列中添加功能“x-delayed-type: direct”?

这是我的应用程序属性的一部分:

spring.cloud.stream.rabbit.bindings.studentInput.consumer.exchange-type=direct spring.cloud.stream.rabbit.bindings.studentInput.consumer.delayed-exchange=true

但似乎在 RabbitMQ 管理页面中,它在我的队列功能的 Args 中没有x-delayed-type: direct 。我引用了这个Spring Cloud Stream文档:https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/

我究竟做错了什么?先谢谢了

java spring rabbitmq spring-cloud-stream

3
推荐指数
1
解决办法
8575
查看次数

spring-cloud-stream Kafka消费者不会消费服务关闭时发送的消息吗?

我有一个 Spring Cloud Stream Kafka 消费者服务,其中确认是手动完成的。提供固定的消费群体。

spring.cloud.stream.bindings.input.group=sampleconsumergroup

resetoffsets 和 startOffset 属性设置如下 spring.cloud.stream.kafka.bindings.input.consumer.resetOffsets=true spring.cloud.stream.kafka.bindings.input.consumer.startOffset=latest

消费者服务监听主题模式。

场景:消费者服务关闭,在此期间有一些消息发送到其主题。当消费者服务恢复时,这些消息不会被消费。只有重新上线后发送的消息才会被消耗。

这符合预期吗?我正处于卡夫卡学习阶段,非常感谢您的解释。

java apache-kafka spring-boot spring-cloud-stream

3
推荐指数
1
解决办法
3846
查看次数

是否可以在 Spring Cloud Stream 中配置多个绑定到同一个处理器?

我有一个简单的 Spring 集成流程,它绑定到处理器以进行输入和输出。

我已经配置了 Kafka 绑定器来映射输入和输出主题。效果很好。

假设我想将 3 个 Kafka 主题绑定到输入,从而导致 3 个已配置的消费者从三个单独的 Kafka 主题中提取数据,然后由我的 SI 流进行处理。

是否可以将多个 Kafka 主题映射到我的处理器的输入?如果是这样,该配置会是什么样子?

spring-boot spring-cloud-stream spring-cloud-stream-binder-kafka

3
推荐指数
1
解决办法
2189
查看次数

带有断路器的 Kafka Consumer,使用 Resilience4j 重试模式

我需要一些帮助来了解如何使用 Spring boot、Kafka、Resilence4J 提出解决方案,以实现来自 Kafka Consumer 的微服务调用。假设如果微服务关闭,那么我需要使用断路器模式通知我的 Kafka 消费者停止获取消息/事件,直到微服务启动并运行。

apache-kafka spring-boot spring-cloud-stream resilience4j

3
推荐指数
1
解决办法
4999
查看次数

如何将 Apache Pulsar 与 Spring Cloud Stream 结合使用?

Apache Pulsar 没有适用于 Spring Cloud Stream 的官方库。我找到了Spring for Apache Pulsar,但它仅与 Spring Boot 有关。在它的 GitHub 存储库中,有一个 [spring-pulsar-spring-cloud-stream-binder],但在 mvnrepository 中找不到。那么我该如何使用它呢?

有没有在 Spring Cloud Stream 中集成 Apache Pulsar 的好方法?

spring-cloud-stream apache-pulsar spring-pulsar

3
推荐指数
1
解决办法
325
查看次数

spring.cloud.stream.kafka.binder.headers无法正常工作

我试图用来spring.cloud.stream.kafka.binder.headers传输基于上一个问题设置的自定义标头。

我已经在文档中阅读了...

spring.cloud.stream.kafka.binder.headers
The list of custom headers that will be transported by the binder.

Default: empty.
Run Code Online (Sandbox Code Playgroud)

似乎建议您设置一个列表(用逗号分隔?)会导致自定义标头在中传输Message<>,但一旦kafka写操作完成,标头就会丢失。

我的注释创建了标头,作为对MessagingGateway的调用的一部分:

@MessagingGateway(name = "redemptionGateway", defaultRequestChannel = Channels.GATEWAY_OUTPUT, defaultHeaders = @GatewayHeader(name = "orderId", expression = "#gatewayMethod.name"))
public interface RedemptionGateway {
    ...
}
Run Code Online (Sandbox Code Playgroud)

我观察到头是在第一次preSend调试中正确创建的:

2016-08-15 15:09:04 http-nio-8080-exec-2 DEBUG DirectChannel:430 - preSend on channel 'gatewayOutput', message: GenericMessage [payload=x.TrivialRedemption@2d052d2a[orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={orderId=create, id=5dccea6f-266e-82b9-54c6-57ec441a26ac, timestamp=1471288144882}] - {applicationSystemCode=x, clientIP=0:0:0:0:0:0:0:1, clusterId=Cluster-Id-NA, containerId=Container-Id-NA, correlationId=UNDEFINED, domainName=defaultDomain, hostName=Host-NA, messageId=10.113.21.144-eb8404d0-de93-4f94-80cb-e5b638e8aeef, userId=anonymous, webAnalyticsCorrelationId=|}
Run Code Online (Sandbox Code Playgroud)

但是在下一次preSend发送时,标头丢失了:

2016-08-15 15:09:05 kafka-binder- DEBUG …
Run Code Online (Sandbox Code Playgroud)

spring-cloud-stream

2
推荐指数
1
解决办法
1770
查看次数

春季云流应用,Dispatcher没有频道“ unknown.channel.name”的订阅者

我正在测试spring-cloud-starter-stream-kafka。出现以下错误。

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:43)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171)
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4$1.doWithRetry(KafkaMessageChannelBinder.java:607)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.onMessage(KafkaMessageChannelBinder.java:604)
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221)
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209)
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67)
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at …
Run Code Online (Sandbox Code Playgroud)

spring-cloud-stream

2
推荐指数
1
解决办法
4040
查看次数

bean类的注释指定bean名称'errorPageFilter'

使用spring cloud stream,遇到以下问题:

bean类[org.springframework.boot.web.support.ErrorPageFilter]的注释指定bean名称'errorPageFilter'与同名和类的现有非兼容bean定义冲突[org.springframework.boot.context.web.ErrorPageFilter ]

我的春云蒸汽依赖性是由判断

 <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-dependencies</artifactId>
    <version>Brooklyn.SR1</version>
    <type>pom</type>
    <scope>import</scope>
 </dependency>
Run Code Online (Sandbox Code Playgroud)

spring-cloud-stream

2
推荐指数
1
解决办法
2089
查看次数

无法仅运行此简单流,...需要SERDE配置吗?

是的,我已经阅读了所有找到的文档,并尝试了所有其他配置方法,但是仅此简单的示例(该行应该记录一行)不起作用

(这是一个带有spring-cloud-stream-binder-kafka-streams的Spring-Boot-2应用程序)

Kafka正在存储一个字符串值(空键)

我的aplilication.yaml

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: 'myStreamTopic'
    output:
          producer.keySerde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
      kafka:
        streams:
          binder:
            configuration:
              default.key.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
              default.value.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
            brokers:
              - 'ommited:9092'
              - 'ommited:9092'
              - 'ommited:9092'
            application-id: hack1
Run Code Online (Sandbox Code Playgroud)

只需将以下简单代码作为POC:

@SpringBootApplication
@Slf4j
public class HackatonApplication {

    public static void main(String[] args) {
SpringApplication.run(HackatonApplication.class, args);
}

  @EnableBinding(KafkaStreamsProcessor.class)
  public static class LineProcessor {

    @StreamListener(Sink.INPUT)
    public void process(KStream<?, String> line) {
      log.info("Received: {}", line);
    }

  }
Run Code Online (Sandbox Code Playgroud)

我无法使其运行!

org.springframework.context.ApplicationContextException:无法启动bean'outputBindingLifecycle';嵌套异常是java.lang.IllegalArgumentException:尝试调用公共抽象org.apache.kafka.streams.kstream.KStream org.apache.kafka.streams.kstream.KStream.map(org.apache.kafka.streams.kstream.KeyValueMapper ),但尚未设置任何代表。在org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:184)〜[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]在org.springframework.context.support处。 DefaultLifecycleProcessor.access $ 200(DefaultLifecycleProcessor.java:52)〜[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]在

抱歉,如果这很琐碎,但我已经花了几个小时进行搜索,谷歌搜索并试图找到有据可查的解决方案。

spring-boot spring-cloud-stream apache-kafka-streams

2
推荐指数
1
解决办法
1260
查看次数

Spring Cloud Stream 多个函数定义

是否可以使用spring.cloud.function具有多个独立函数/绑定的函数式( ) 样式的反应式 SCS 应用程序?我发现的所有示例总是只注册一个具有默认绑定的功能性 bean input, output。我想注册多个,每个都有自己的绑定。

传统上可以使用spring-cloud-stream-reactive它来完成,但现在不赞成使用功能支持。

spring-cloud-stream reactive spring-cloud-function

2
推荐指数
1
解决办法
3771
查看次数