标签: spring-cloud-stream

如何测试使用 Avro 和 Confluence Schema Registry 的 Spring Cloud Stream Kafka Streams 应用程序?

我无法弄清楚如何测试使用 Avro 作为消息格式和(融合)架构注册表的 Spring Cloud Stream Kafka Streams 应用程序。

配置可能是这样的:

spring:
  application:
    name: shipping-service
  cloud:
    stream:
      schema-registry-client:
        endpoint: http://localhost:8081
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: shipping-service
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
              schema:
                registry:
                  url: ${spring.cloud.stream.schema-registry-client.endpoint}
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
          bindings:
            input:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            order:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
            output:
              producer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
      bindings:
        input:
          destination: customer
        order:
          destination: order
        output:
          destination: order

server:
  port: 8086

logging:
  level:
    org.springframework.kafka.config: debug
Run Code Online (Sandbox Code Playgroud)

笔记:

  • 它使用本机序列化/反序列化。
  • 测试框架:Junit 5

我想关于 Kafka Broker,我应该使用 EmbeddedKafkaBroker bean,但如您所见,它还依赖于应该以某种方式模拟的模式注册表。如何?

apache-kafka spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
2098
查看次数

无法解码 Spring Cloud Data Flow 流中 key: file_name 的 json 类型

我使用 Spring Cloud Data Flow 设置一个读取 CSV 文件的流,使用自定义处理器对其进行转换并记录它:

stream create --name testsourcecsv --definition "file --mode=lines --directory=D:/toto/ --file.filename-pattern=adresses-28.csv --maxMessages=1000 | csvToMap --spring.cloud.stream.bindings.output.content-type=application/json | log --spring.cloud.stream.bindings.input.content-type=application/json" --deploy
Run Code Online (Sandbox Code Playgroud)

文件和 csvToMap 应用程序工作正常,但在日志应用程序中,对于每条记录,我都看到这种异常:

2019-12-03 11:32:46.500 ERROR 1328 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$5  : Could not decode json type: adresses-28.csv for key: file_name

com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'adresses': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"adresses-28.csv"; line: 1, column: 10]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.9.jar!/:2.9.9]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703) ~[jackson-core-2.9.9.jar!/:2.9.9]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532) ~[jackson-core-2.9.9.jar!/:2.9.9]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2627) ~[jackson-core-2.9.9.jar!/:2.9.9]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832) ~[jackson-core-2.9.9.jar!/:2.9.9]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729) ~[jackson-core-2.9.9.jar!/:2.9.9] …
Run Code Online (Sandbox Code Playgroud)

spring-cloud spring-cloud-stream spring-cloud-dataflow spring-cloud-stream-binder-kafka

1
推荐指数
1
解决办法
3095
查看次数

是否可以禁用 spring-cloud-stream 对特定方法的功能绑定?

我有一个基于 Spring Boot 的库(使用 spring-data-mongo)来创建一个PersistentEntitiesbean。PersistentEntities碰巧实现了该Supplier<T>接口,因此 Spring Cloud Stream 功能绑定器正在创建与其的绑定。更具体地说,BeanFactoryAwareFunctionRegistry.discoverDefaultDefinitionIfNecessary发现它是类型为 的 bean Supplier

我们使用 Spring Cloud Streams Kafka 绑定器,因此 Spring 尝试将每个对象发布到它创建的 Kafka 主题。这会导致 JSON 序列化程序中出现无限递归问题:

2019-12-04 15:36:54.323错误1 --- [调度-1] osihLoggingHandler:org.springframework.messaging.MessagingException:无法调用方法;嵌套异常是 org.springframework.messaging.converter.MessageConversionException:无法编写 JSON:无限递归(StackOverflowError)(通过引用链:org.springframework.data.mongodb.core.mapping.BasicMongoPersistentEntity["idProperty"] -> org. springframework.data.mongodb.core.mapping.CachingMongoPersistentProperty["owner"] -> org.springframework.data.mongodb.core.mapping.BasicMongoPersistentEntity["idProperty"] -> org.springframework.data.mongodb.core.mapping。 CachingMongoPersistentProperty["owner"] -> org.springframework.data.mongodb.core.mapping.BasicMongoPersistentEntity["idProperty"] -> org.springframework.data.mongodb.core.mapping.CachingMongoPercientProperty["owner"] -> org. springframework.data.mongodb.core.mapping.BasicMongoPersistentEntity["idProperty"] -> org.springframework.data.mongodb.core.mapping.CachingMongoPersistentProperty["owner"] -> org.springframework.data.mongodb.core.mapping。 BasicMongoPersistentEntity["idProperty"] -> org.springframework.data.mongodb.core.mapping.CachingMongoPersistentProperty["owner"] -> org.springframework.data.mongodb.core.mapping.BasicMongoPersistentEntity["idProperty"] -> org.springframework.data.mongodb.core.mapping springframework.data.mongodb.core.mapping.CachingMongoPersistentProperty[“所有者”] ...

有没有办法将我的 bean 从函数绑定中排除?使用这个库的项目没有使用 Spring Cloud Function,但我更愿意保留这种可能性。

作为参考,我的 bean 定义为:

@Bean
public PersistentEntities …
Run Code Online (Sandbox Code Playgroud)

java spring-cloud-stream spring-cloud-function

1
推荐指数
1
解决办法
5581
查看次数

禁用 Spring Cloud Stream Rabbit 进行测试

我用:

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        <version>3.0.1.RELEASE</version>
    </dependency>
Run Code Online (Sandbox Code Playgroud)

我需要禁用 Rabbit 来测试应用程序。我试过这个:

spring:
    cloud:
       config:
           enabled: false
           discovery:
               enabled: false
Run Code Online (Sandbox Code Playgroud)

它不起作用。

我需要做什么才能阻止 Rabbit 组件启动?

spring-rabbit spring-cloud-stream

1
推荐指数
1
解决办法
3229
查看次数

将 StateRestoreListener 与 Spring Cloud Kafka Streams 绑定器结合使用

我将结合使用 StateRestoreListener 和 Spring Cloud Kafka Streams 绑定器。我需要监视应用程序的容错状态存储的恢复进度。汇合https://docs.confluence.io/current/streams/monitoring.html#streams-monitoring-runtime-status中有示例。

为了观察所有状态存储的恢复,您需要为应用程序提供 org.apache.kafka.streams.processor.StateRestoreListener 接口的实例。您可以通过调用 KafkaStreams#setGlobalStateRestoreListener 方法来设置 org.apache.kafka.streams.processor.StateRestoreListener。

第一个问题是从应用程序获取 Kafka Streams。我用以下方法解决了这个问题

StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
Run Code Online (Sandbox Code Playgroud)

第二个问题是将 StateRestoreListener 设置为 KafkaStreams,因为我收到错误

java.lang.IllegalStateException:只能在 CREATED 状态下设置 GlobalStateRestoreListener。当前状态是:正在运行

是否可以在 Spring Cloud Kafka Streams 绑定器中使用 StateRestoreListener?谢谢

java spring apache-kafka spring-cloud-stream apache-kafka-streams

1
推荐指数
1
解决办法
1338
查看次数

实现死信队列 (DLQ) 时,会引发通道错误

通道错误;协议方法:#method(reply-code=406,reply-text=PRECONDITION_FAILED - 虚拟主机“ecp-audit”中队列“ecpauditchannel.ecpqueue”的不等价参数“x-dead-letter-exchange”:收到值“DLX”类型为“longstr”,但当前没有,class-id=50,method-id=10)

以下是属性文件中设置的配置。

spring.rabbitmq.virtual-host=ecp-audit
spring.cloud.stream.bindings.ecpinputchannel.destination=ecpchannel
spring.cloud.stream.bindings.error.destination=ecpError
spring.cloud.stream.rabbit.bindings.ecpauditinputchannel.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
Run Code Online (Sandbox Code Playgroud)

提前致谢

spring-rabbit spring-cloud-stream

1
推荐指数
1
解决办法
2745
查看次数

Spring Cloud Stream - 在运行时路由到多个动态目的地

我有一个用例,我需要生成在运行时确定的多个 Kafka 主题/目的地。我尝试通过使用从类型的函数 bean返回并为每个参数设置标头(如此处所述)来将函数与多个输入和输出参数组合起来。我想出了以下实现:Flux<Message<T>>Functionspring.cloud.stream.sendto.destinationMessage

@Bean
public Function<Person, Flux<Message<Person>>> route() {
    return person -> Flux.fromIterable(Stream.of(person.getEvents())
            .map(e -> MessageBuilder.withPayload(person)
                    .setHeader("spring.cloud.stream.sendto.destination", e).build())
            .collect(Collectors.toList()));
}
Run Code Online (Sandbox Code Playgroud)

我的配置中也有这个:

spring.cloud.stream.dynamic-destinations=
Run Code Online (Sandbox Code Playgroud)

这是我的Person

@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person {
    private String[] events;
    private String name;
}
Run Code Online (Sandbox Code Playgroud)

events包含 Kafka 主题名称列表。

然而,这不起作用。我缺少什么?

spring spring-boot spring-cloud-stream

1
推荐指数
1
解决办法
2211
查看次数

Spring Cloud Stream与Kafka Streams Binder:如何为流处理器设置`trusted.packages`(这与消费者和生产者不同)

我有一个简单的流处理器(不是消费者/生产者),如下所示(Kotlin)

\n
@Bean\nfun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {\n    return Function { input-> input.map { key, value ->\n        println("\\nPAYLOAD KEY: ${key.name}\\n");\n        println("\\nPAYLOAD value: ${value.address}\\n");\n        val output = FooAddressPlus()\n        output.address = value.address\n        output.name = value.name\n        output.plus = "$value.name-$value.address"\n        KeyValue(key, output)\n    }}\n}\n
Run Code Online (Sandbox Code Playgroud)\n

这些类FooNameFooAddress和 与FooAddressPlus处理器位于同一包中。\nHere\xe2\x80\x99s 我的配置文件:

\n
spring.cloud.stream.kafka.binder:\n  brokers: localhost:9093\n\nspring.cloud.stream.function.definition: processFoo\n\nspring.cloud.stream.kafka.streams.binder.functions.processFoo.applicationId: foo-processor\nspring.cloud.stream.bindings.processFoo-in-0:\n  destination: foo.processor\nspring.cloud.stream.bindings.processFoo-out-0:\n  destination: foo.processor.out\n\nspring.cloud.stream.kafka.streams.binder:\n  deserializationExceptionHandler: logAndContinue\n  configuration:\n    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde\n    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde\n    commit.interval.ms: 1000\n
Run Code Online (Sandbox Code Playgroud)\n

运行处理器时出现此错误:

\n
The class '<here_comes_package>.FooAddress' is not in the …
Run Code Online (Sandbox Code Playgroud)

kotlin apache-kafka spring-cloud-stream

1
推荐指数
1
解决办法
1669
查看次数

当控制转到catch块时如何停止发送到kafka主题功能性kafka spring

您能否建议一下,当控件到达 catch 块时,如何停止发送到我的第三个 kafka 主题,当前消息被发送到错误主题以及正常处理时应发送到的主题。一段代码如下:

@Component
public class Abc {
private final StreamBridge streamBridge;
public Abc (StreamBridge streamBridge)
this.streamBridge = streamBridge;
@Bean
public Function<KStream<String, KafkaClass>, KStream<String,KafkaClass>> hiProcess() {
return input -> input.map((key,value) -> {
try{
KafkaClass stream = processFunction();
}
catch(Exception e) {
Message<KakfaClass> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("errProcess-out-0". mess);
}
return new KeyValue<>(key, stream);
})
}
}

    
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
1926
查看次数

Spring Cloud Kafka Streams 与 Spring Cloud Stream 之间的区别?

Spring Cloud Kafka Streams、Spring Cloud Stream、Spring Cloud Function、Spring AMQP 和 Spring for Apache Kafka 有什么区别?

apache-kafka spring-cloud-stream

1
推荐指数
1
解决办法
3728
查看次数

Spring Cloud Stream with Avro 无法正确转换字符串消息

我遇到一个问题,源发送 GenericMessage [payload=xxxxx, ...],而接收器接收消息为 10,120,120,120,120,120。

这个问题是在我设置 Avro 消息转换器后发生的。如果我删除 Avro 消息转换器并使用 StreamListener 来处理消息转换,它将正常工作。

源应用程序.properties

spring.cloud.stream.bindings.toGreeting.destination=greeting
spring.cloud.stream.bindings.toGreeting.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
Run Code Online (Sandbox Code Playgroud)

水槽应用

server.port=8990 
spring.cloud.stream.bindings.greeting.destination=greeting
Run Code Online (Sandbox Code Playgroud)

消息转换器

@Configuration
@EnableSchemaRegistryClient
public class MessageConverterConfig {
    @Bean
    public MessageConverter topic1MessageConverter() throws IOException {
        return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
    }
}
Run Code Online (Sandbox Code Playgroud)

应用类

@SpringBootApplication
@EnableSchemaRegistryClient
public class SourceApplication {
    public static void main(String[] args) {
        SpringApplication.run(SourceApplication.class, args);
    }
}

@EnableSchemaRegistryServer
@EnableSchemaRegistryClient
@SpringBootApplication
public class SinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(SinkApplication.class, args);
    }
}
Run Code Online (Sandbox Code Playgroud)

我缺少配置吗?谢谢。

spring-cloud-stream

0
推荐指数
1
解决办法
2634
查看次数

Spring Cloud Stream + Kafka启用.auto.commit

我有一个项目,我已升级到最新版本的 Spring Boot 和 Spring Cloud,并注意到一些意外的行为。

spring-boot:2.5.2 spring-cloud 2020.0.3

另外值得注意的是,spring-kaka由于问题https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1079 ,我已降级

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.6.7</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

我已打开日志记录:

<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="info"/>
Run Code Online (Sandbox Code Playgroud)

我的 spring yaml 如下:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      auto-offset-reset: latest
      **enable-auto-commit: true**
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: myapp.serde.MyCustomDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: myapp.serde.MyCustomSerializer
    properties:
      security:
        protocol: PLAINTEXT
  cloud:
    stream:
      default:
        producer:
          useNativeEncoding: true
        consumer:
          useNativeEncoding: true
      bindings:
        myInboundRoute:
          destination: some-destination.1
          group: a-custom-group
        myOutboundRoute:
          destination: some-destination.2
Run Code Online (Sandbox Code Playgroud)

当我启动应用程序时,我看到以下输出:

[2021-07-09 21:41:42,310] [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = …
Run Code Online (Sandbox Code Playgroud)

spring-cloud-stream spring-kafka spring-cloud-stream-binder-kafka

0
推荐指数
1
解决办法
4558
查看次数

如何在Spring Cloud Stream SQS Binder中将多个消费者配置到单个目标队列

我的应用程序正在接收两个具有不同负载的事件。所以我为每个事件编写了两个不同的消费者。

消费者1:

@Bean
public Consumer<TestEvent1> testEvent1() {
  // my consumer logic
}
Run Code Online (Sandbox Code Playgroud)

消费者2:

@Bean
public Consumer<TestEvent2> testEvent2() {
  // my consumer logic
}
Run Code Online (Sandbox Code Playgroud)

以下是Spring Cloud Stream配置:

spring:
  cloud:
    stream:
      sqs:
        bindings:
          testEvent1-in-0:
            consumer:
              snsFanout: true
              messageDeletionPolicy: ON_SUCCESS
              waitTimeout: 20
              maxNumberOfMessages: 10
              visibilityTimeout: 30
          testEvent2-in-0:
            consumer:
              snsFanout: true
              messageDeletionPolicy: ON_SUCCESS
              waitTimeout: 20
              maxNumberOfMessages: 10
              visibilityTimeout: 30
      bindings:
        testEvent1-in-0:
          destination: events-queue
        testEvent2-in-0:
          destination: events-queue
      default-binder: sqs
Run Code Online (Sandbox Code Playgroud)

但通过上述配置,两个消费者都无法工作。如果我删除任何一个消费者,则另一个正在工作。

我如何能够配置云流,以便两个消费者都将根据生成的事件有效负载进行工作。

java spring spring-cloud-stream

0
推荐指数
1
解决办法
2457
查看次数