标签: spring-cloud-stream

如何在spring cloud stream和kafka中发送和接收相同的主题

我有一个spring-cloud-streamkafka绑定的应用程序.我想从同一个可执行文件(jar)中发送和接收来自同一主题的消息.我有我的频道定义,如下所示: - public interface ChannelDefinition { @Input("forum") public SubscriableChannel readMessage(); @Output("forum") public MessageChannel postMessage(); }

@StreamListener用来接收消息.我得到各种意想不到的错误.有时,我收到

  1. 没有为每个其他消息找到unknown.message.channel的调度程序
  2. 如果我将命令行kafka订阅者附加到上述论坛主题,它将收到所有其他消息.
  3. 我的应用程序接收所有其他消息,这是来自命令行订户的独占消息集.我确保我的应用程序在特定的组名下订阅.

是否有上述用例的工作示例?

spring spring-integration spring-boot spring-cloud-stream spring-cloud-dataflow

6
推荐指数
1
解决办法
2358
查看次数

Spring Cloud Stream Kafka消费者模式

对于具有多个分区的主题-

1)一个SpringBoot实例是否使用多个线程来处理(用StreamListener注释的方法)每个分区中的每个消息?

2)是否可以为每个分区配置多个线程,还是我必须将其从侦听器线程手动移交给工作池?

spring-cloud-stream

6
推荐指数
1
解决办法
982
查看次数

如何使用Spring Cloud Stream Kafka和每服务数据库实现微服务事件驱动架构

我正在尝试实现事件驱动的体系结构来处理分布式事务.每个服务都有自己的数据库,并使用Kafka发送消息以通知其他微服务有关操作.

一个例子:

 Order service -------> | Kafka |------->Payment Service
       |                                       |
Orders MariaDB DB                   Payment MariaDB Database
Run Code Online (Sandbox Code Playgroud)

订单收到订单请求.它必须将新订单存储在其数据库中并发布消息,以便付款服务意识到它必须为该项目收费:

私人订单业务订单业务;

@PostMapping
public Order createOrder(@RequestBody Order order){
    logger.debug("createOrder()");
    //a.- Save the order in the DB
    orderBusiness.createOrder(order);
    //b. Publish in the topic so that Payment Service charges for the item.
    try{
        orderSource.output().send(MessageBuilder.withPayload(order).build());
    }catch(Exception e){
        logger.error("{}", e);
    }
    return order;
}
Run Code Online (Sandbox Code Playgroud)

这些是我的疑惑:

  1. 步骤a.-(保存在订单DB中)和b.-(发布消息)应该在事务中以原子方式执行.我怎样才能做到这一点?
  2. 这与前一个相关:我发送消息:orderSource.output().send(MessageBuilder.withPayload(order).build()); 无论Kafka代理是否已关闭,此操作都是异步的,并且总是返回true.我如何知道该消息已经到达Kafka经纪人?

apache-kafka microservices spring-cloud spring-cloud-stream spring-kafka

6
推荐指数
1
解决办法
2838
查看次数

如何出于开发目的禁用Spring Cloud Stream绑定?

我需要禁用事件的发布和订阅才能进行开发,但是我无法为此找到一些配置属性/其他解决方案。我怎样才能做到这一点?

可能的解决方案:使用@EnableBinding某些属性集创建自动配置,并在禁用的情况下,用生成的无操作存根替换所有绑定接口。但是也许存在更简单的解决方案?

spring-boot spring-cloud-stream

6
推荐指数
1
解决办法
2753
查看次数

如何在默认情况下从 Kafka Spring Cloud Stream 消费并同时消费由 Confluent API 生成的 Kafka 消息?

我正在构建一个微服务组件,它将默认使用由其他 (SCS) 组件生成的 Spring Cloud Stream (SCS) Kafka 消息。

但是我还需要从使用融合 API 的其他组件中使用 Kafka 消息。

我有一个示例存储库,显示了我正在尝试做什么。

https://github.com/donalthurley/KafkaConsumeScsAndConfluent

这是下面带有 SCS 输入绑定和融合输入绑定的应用程序配置。

spring:
  application:
    name: kafka
  kafka:
    consumer:
      properties.schema.registry.url: http://192.168.99.100:8081
  cloud:
    stream:
      kafka:
        binder:
          brokers: PLAINTEXT://192.168.99.100:9092
#          configuration:
#            specific:
#              avro:
#                reader: true
#            key:
#              deserializer: org.apache.kafka.common.serialization.StringDeserializer
#            value:
#              deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

      bindings:
        inputConfluent:
          contentType: application/*+avro
          destination: confluent-destination
          group: input-confluent-group
        inputScs:
          contentType: application/*+avro
          destination: scs-destination
          group: input-scs-group
Run Code Online (Sandbox Code Playgroud)

通过上述配置,我使用 SCS 默认配置创建了两个使用者。例如,类 org.apache.kafka.common.serialization.ByteArrayDeserializer 是两个输入绑定的值反序列化器。

如果我删除上述配置中的注释,我会从我的 Confluent 客户端发送配置的两个消费者。例如,类 io.confluent.kafka.serializers.KafkaAvroDeserializer 是两个输入绑定的值反序列化器。

我理解因为配置在 Kafka binder …

java spring apache-kafka spring-cloud-stream

5
推荐指数
1
解决办法
1386
查看次数

spring-cloud-dataflow:流部署在 OSX 上失败

我无法http | log按照入门流指南部署和执行基本流 :

wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/2.1.0.RELEASE/spring-cloud-dataflow-server/docker-compose.yml;
export DATAFLOW_VERSION=2.1.0.RELEASE;
export SKIPPER_VERSION=2.0.2.RELEASE;
docker-compose up;
open http://localhost:9393/dashboard/#/streams/create;
echo "Then, create a stream via text input: `http | log`"
echo "Then, deploy the stream. The deployment fails with exit code 137."
Run Code Online (Sandbox Code Playgroud)

我得到绿色的“成功部署的流定义”。状态显示为“正在部署”,但从未完全部署。ERROR日志、UI 或网络请求中没有消息。

这是docker-compose up我部署流后的控制台输出。

skipper              | 2019-05-24 22:31:53.363  INFO 1 --- [io-7577-exec-10] o.s.s.support.LifecycleObjectSupport     : started UPGRADE UPGRADE_DEPLOY_TARGET_APPS_SUCCEED UPGRADE_DEPLOY_TARGET_APPS UPGRADE_START UPGRADE_DELETE_SOURCE_APPS UPGRADE_CHECK_TARGET_APPS UPGRADE_WAIT_TARGET_APPS UPGRADE_CANCEL UPGRADE_DEPLOY_TARGET_APPS_FAILED UPGRADE_CHECK_CHOICE UPGRADE_EXIT INSTALL INSTALL_INSTALL INSTALL_EXIT ERROR DELETE DELETE_DELETE DELETE_EXIT ROLLBACK ROLLBACK_START …
Run Code Online (Sandbox Code Playgroud)

spring spring-cloud-stream spring-cloud-dataflow

5
推荐指数
1
解决办法
416
查看次数

发件箱模式 - 我们如何防止消息中继过程生成重复的消息?

实现发件箱模式的通常方法是将消息有效负载存储在发件箱表中,并有一个单独的进程(消息中继)查询待处理的消息,并将它们发布到消息代理中,在我的例子中是 Kafka。

发件箱表的状态可能如下所示。

 OUTBOX TABLE
 ---------------------------------
|ID | STATE     | TOPIC | PAYLOAD |
 ---------------------------------
| 1 | PROCESSED | user            |
| 2 | PENDING   | user            |
| 3 | PENDING   | billing         |
----------------------------------
Run Code Online (Sandbox Code Playgroud)

My Message Relay 是一个 Spring Boot/Cloud Stream 应用程序,它定期 ( @Scheduled) 查找 PENDING 记录,将它们发布到 Kafka 并将记录更新为 PROCESSED 状态。

第一个问题是:如果我启动 Message Relay 的多个实例,所有实例都会查询 Outbox 表,并且可能在某些时候不同的实例将获得相同的 PENDING 注册表以发布到 Kafka,从而生成重复的消息。我怎样才能防止这种情况?

另一种情况:假设只有一个消息中继。它获取一个 PENDING 记录,将其发布到主题,但在将记录更新为 PROCESSED 之前崩溃。当它再次启动时,它会找到相同的 PENDING 记录并再次发布它。有没有办法避免这种重复,或者唯一的方法是设计一个幂等系统。

apache-kafka spring-cloud spring-cloud-stream

5
推荐指数
1
解决办法
1340
查看次数

使用 objectMapper 将 JSON 日期格式反序列化为 ZonedDateTime

背景

  1. 我有以下 JSON(来自 Kafka 的消息)
{
      "markdownPercentage": 20,
      "currency": "SEK",
      "startDate": "2019-07-25"
}
Run Code Online (Sandbox Code Playgroud)
  1. 我有以下(生成 JSON 模式)POJO(我无法更改 POJO,因为它是公司的共享资源)
{
      "markdownPercentage": 20,
      "currency": "SEK",
      "startDate": "2019-07-25"
}
Run Code Online (Sandbox Code Playgroud)
  1. 我们的应用程序是一个 Spring Boot 应用程序,它使用 Spring Cloud Stream 从 Kafka 读取 JSON 消息 (1) 并使用 POJO (2),然后对其进行处理。

问题

当应用程序尝试将消息反序列化到对象时,它会引发以下异常

public class Markdown {
    @JsonProperty("markdownPercentage")
    @NotNull
    private Integer markdownPercentage = 0;
    @JsonProperty("currency")
    @NotNull
    private String currency = "";
    @JsonFormat(
        shape = Shape.STRING,
        pattern = "yyyy-MM-dd"
    )
    @JsonProperty("startDate")
    @NotNull
    private ZonedDateTime startDate;

    // Constructors, Getters, Setters etc.

} …
Run Code Online (Sandbox Code Playgroud)

java json jackson spring-boot spring-cloud-stream

5
推荐指数
1
解决办法
1万
查看次数

如何在 Spring Cloud Stream 项目中将传入的标头映射为 String 而不是 byte[]?

我有一个使用 Spring Integration DSL 流和 Kafka 绑定器的简单 Spring Cloud Stream 项目。一切正常,但来自 Kafka 的消息头值以byte[].

这意味着我的 SI@Header参数需要是 类型byte[]。哪个有效,但最好将它们作为字符串(我关心的所有入站标头都是字符串值)。

我已经将 Kafka 客户端配置为使用 StringSerializer/StringDeserializer。我假设我还需要以某种方式告诉 Spring Kafka 哪些标头映射为字符串以及使用什么字符编码。

我显然在这里遗漏了一些东西。有小费吗?

spring spring-integration spring-cloud-stream spring-kafka spring-integration-dsl

5
推荐指数
1
解决办法
1851
查看次数

Spring Cloud @StreamListener 条件已弃用,替代方案是什么

我们有多个应用程序消费者监听同一个 kafka 主题,并且生产者在向主题发送消息时设置消息标头,以便特定实例可以评估标头并处理消息。例如

@StreamListener(target=ITestSink.CHANNEL_NAME,condition="headers['franchiseName'] == 'sydney'")
public void fullfillOrder(@Payload TestObj message) {
    log.info("sydney order request received message is {}",message.getName());
}
Run Code Online (Sandbox Code Playgroud)

在 Spring Cloud Stream 3.0.0 中,@StreamListener 已被弃用,我无法在 Function 中找到条件属性的等效项。

有什么建议吗?

java spring-boot spring-cloud-stream apache-kafka-streams spring-cloud-stream-binder-kafka

5
推荐指数
1
解决办法
5483
查看次数