标签: spring-cloud-stream

Kafka 与 Rabbit MQ 的优缺点

Kafka 和 RabbitMQ 是众所周知的消息代理。我想用 Spring Boot 构建一个微服务,似乎 Spring Cloud 为他们提供了开箱即用的解决方案作为事实上的选择。我知道一些 RabbitMQ 的托盘,它有很多支持。Kafka 属于 Apache,所以应该不错。那么RabbitMQ和Kafka之间的主要目标区别是什么?考虑到这将与 Spring Cloud 一起使用。请分享您的经验和标准。提前致谢。

rabbitmq messagebroker spring-rabbit apache-kafka spring-cloud-stream

1
推荐指数
1
解决办法
5832
查看次数

对于使用通用json消息的Spring云流kafka绑定器是否有一个很好的示例

我是Spring cloud Stream和Kafka的新手,我正在寻找一个从kafka主题中消费json消息的好例子.

谢谢

java spring apache-kafka spring-cloud-stream

1
推荐指数
1
解决办法
4730
查看次数

Spring Cloud Stream - 并发

使用 Spring Cloud Stream 版本 Chelsea.SR2,使用 RabbitMQ 作为消息代理。为了拥有多个消费者,我们使用属性并发(入站消费者的并发)。
如果我们将并发数设置为 50。它从 1 开始,慢慢地增加消费者数量。是否有任何可能的解决方案以更高的数字而不是 1 启动初始消费者计数以提高消费者性能。

spring-cloud-stream

1
推荐指数
1
解决办法
3602
查看次数

Kafka Streams - 使用容错定义自定义关系/非键值状态存储

我正在尝试使用 kafka 实现事件溯源。

我对流处理器应用程序的设想是一个典型的 3 层 Spring 应用程序,其中:

  • “表示”层被(由?)Kafka 流 API 取代。
  • 业务逻辑层由拓扑中的处理器 API 使用。
  • 此外,DB 是一个关系 H2 内存数据库,可通过 Spring Data JPA 存储库访问。存储库还实现了必要的接口,以便将它们注册为 Kafka 状态存储以使用好处(恢复和容错)

但我想知道我应该如何实现自定义状态存储部分?

我一直在寻找和:

  • 有一些接口,例如StateStore& StoreBuilderStoreBuilderwithLoggingEnabled()方法;但是如果我启用它,实际的更新和更改日志记录何时发生?通常示例都是键值存储,即使是自定义的。如果我不想要键值怎么办?kafka 文档中交互式查询部分的示例并没有削减它。

  • 我知道交互式查询。但它们似乎适合查询而不是更新;顾名思义。

在键值存储中,发送到更改日志的记录很简单。但是如果我不使用键值;我何时以及如何通知 kafka 我的状态已更改?

apache-kafka spring-cloud-stream apache-kafka-streams

1
推荐指数
1
解决办法
571
查看次数

Spring Cloud Stream Kafka - 找不到 Serde 类:org.apache.kafka.common.serialization.Serde$StringSerde

我正在尝试使用 Spring Cloud Stream 框架构建一个简单的 Kafka Streams 应用程序。我可以连接到流来推送原始数据进行处理。但是当我尝试按键处理事件计数的流Serde class not found: org.apache.kafka.common.serialization.Serde$StringSerde时,我在运行应用程序时遇到异常。我检查了我的项目包含的库,我可以找到这个Serde类,它没有丢失。我不知道为什么在运行时它没有被加载!

下面是我的源文件。

com.pgp.learn.kafka.analytics.AnalyticsApplication

package com.pgp.learn.kafka.analytics;

import com.pgp.learn.kafka.analytics.model.PageViewEvent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
@EnableBinding(AnalyticsBinding.class)
public class  AnalyticsApplication {

    public static void main(String[] …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-cloud-stream apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
1019
查看次数

Spring Cloud Dataflow 自定义应用程序卡在部署状态

我创建了一个自定义 Spring Cloud 流处理器应用程序,并将其部署为 Source|Processor|Sink 流中的处理器步骤。一切似乎都运行良好,但我的自定义应用程序在数据流 UI 中显示“正在部署”。如果这会影响任何事情,我会将它部署为来自 mavenLocal 的 SNAPSHOT。我是否遗漏了什么让 SCDF 知道部署成功?

spring-cloud-stream spring-cloud-dataflow

1
推荐指数
1
解决办法
625
查看次数

如何从 Spring Cloud 流中读取 Kafka 消息密钥?

我正在使用 Spring Cloud 流来消费来自 Kafka 的消息。

是否可以从代码中读取 Kafka 消息密钥?

我有一个 Kafka 主题,它通常有两种类型的消息。要采取的操作因消息键而异。我看到 spring 文档只有以下内容来阅读消息。在这里,我需要指定消息的实际映射(此处为 Greetings 类)。但是,我需要一种方法来读取消息键并确定可反序列化的 Pojo

public class GreetingsListener {

    @StreamListener(GreetingsProcessor.INPUT)
    public void handleGreetings(@Payload Greetings request) {
     
    }
}
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka spring-cloud-stream

1
推荐指数
1
解决办法
1604
查看次数

spring 云流-消费群绑定

我的消费者绑定到匿名消费者组,而不是我指定的消费者组。

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost
          defaultBrokerPort: 9092
          zkNodes: localhost
          defaultZkPort: 2181
        bindings:
          inEvent:
            group: eventin
            destination: event
          outEvent:
            group: eventout
            destination: processevent
Run Code Online (Sandbox Code Playgroud)

我的 Spring Boot 应用程序

@SpringBootApplication
@EnableBinding(EventStream.class)
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
    @StreamListener(value = "inEvent")
    public void getEvent(Event event){
        System.out.println(event.name);
    }
}
Run Code Online (Sandbox Code Playgroud)

我的输入输出通道接口

public interface EventStream {
    @Input("inEvent")
    SubscribableChannel inEvent();
    @Output("outEvent")
    MessageChannel outEvent();
}
Run Code Online (Sandbox Code Playgroud)

我的控制台日志--

:在 3.233 秒内启动 ConsumerApplication(JVM 运行 4.004):[ Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d]发现组协调员 singh:9092 (id: 2147483647 …

spring-boot spring-cloud kafka-consumer-api spring-cloud-stream spring-kafka

1
推荐指数
1
解决办法
2759
查看次数

Spring Cloud Stream 与 Kafka Stream 的 Exactly Once 功能对比

我无法在此处以及 Spring 网站和博客上找到 Spring Cloud Stream 是否能够提供 Kafka Stream API 提供的“Exactly Once”语义。也许没有单个配置/注释,并且在线程“是否可以使用 Spring Cloud Stream 进行一次处理? ”我可以找到一些有用的东西,但专家的答案是非常高的水平。感谢帮助

apache-kafka spring-cloud-stream apache-kafka-streams

1
推荐指数
1
解决办法
2646
查看次数

Spring Cloud Stream 3.0 存在生产者问题

看了spring cloud stream 3.0的文档,理解了新的用java.util.function.[Supplier/Function/Consumer]来代表生产者、消费和生产、消费者,这个应该是正确的。

但我不了解供应商。

该文档指出,对供应商的轮询用于一致地为供应商生成数据,并且不需要任何程序参与。

但是很多时候,我们需要在特定时间生成数据,例如 Web 请求,而我找不到任何文档或示例。

它可能就像注入 Supplier 对象并调用 get() 方法一样简单,但是如何禁用轮询调用呢?

感谢所有提供信息的人。

spring-cloud-stream spring-cloud-stream-binder-kafka

1
推荐指数
1
解决办法
1415
查看次数