Kafka 和 RabbitMQ 是众所周知的消息代理。我想用 Spring Boot 构建一个微服务,似乎 Spring Cloud 为他们提供了开箱即用的解决方案作为事实上的选择。我知道一些 RabbitMQ 的托盘,它有很多支持。Kafka 属于 Apache,所以应该不错。那么RabbitMQ和Kafka之间的主要目标区别是什么?考虑到这将与 Spring Cloud 一起使用。请分享您的经验和标准。提前致谢。
rabbitmq messagebroker spring-rabbit apache-kafka spring-cloud-stream
我是Spring cloud Stream和Kafka的新手,我正在寻找一个从kafka主题中消费json消息的好例子.
谢谢
使用 Spring Cloud Stream 版本 Chelsea.SR2,使用 RabbitMQ 作为消息代理。为了拥有多个消费者,我们使用属性并发(入站消费者的并发)。
如果我们将并发数设置为 50。它从 1 开始,慢慢地增加消费者数量。是否有任何可能的解决方案以更高的数字而不是 1 启动初始消费者计数以提高消费者性能。
我正在尝试使用 kafka 实现事件溯源。
我对流处理器应用程序的设想是一个典型的 3 层 Spring 应用程序,其中:
但我想知道我应该如何实现自定义状态存储部分?
我一直在寻找和:
有一些接口,例如StateStore& StoreBuilder。StoreBuilder有withLoggingEnabled()方法;但是如果我启用它,实际的更新和更改日志记录何时发生?通常示例都是键值存储,即使是自定义的。如果我不想要键值怎么办?kafka 文档中交互式查询部分的示例并没有削减它。
我知道交互式查询。但它们似乎适合查询而不是更新;顾名思义。
在键值存储中,发送到更改日志的记录很简单。但是如果我不使用键值;我何时以及如何通知 kafka 我的状态已更改?
我正在尝试使用 Spring Cloud Stream 框架构建一个简单的 Kafka Streams 应用程序。我可以连接到流来推送原始数据进行处理。但是当我尝试按键处理事件计数的流Serde class not found: org.apache.kafka.common.serialization.Serde$StringSerde时,我在运行应用程序时遇到异常。我检查了我的项目包含的库,我可以找到这个Serde类,它没有丢失。我不知道为什么在运行时它没有被加载!
下面是我的源文件。
com.pgp.learn.kafka.analytics.AnalyticsApplication
package com.pgp.learn.kafka.analytics;
import com.pgp.learn.kafka.analytics.model.PageViewEvent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
@EnableBinding(AnalyticsBinding.class)
public class AnalyticsApplication {
public static void main(String[] …Run Code Online (Sandbox Code Playgroud) java apache-kafka spring-cloud-stream apache-kafka-streams spring-kafka
我创建了一个自定义 Spring Cloud 流处理器应用程序,并将其部署为 Source|Processor|Sink 流中的处理器步骤。一切似乎都运行良好,但我的自定义应用程序在数据流 UI 中显示“正在部署”。如果这会影响任何事情,我会将它部署为来自 mavenLocal 的 SNAPSHOT。我是否遗漏了什么让 SCDF 知道部署成功?
我正在使用 Spring Cloud 流来消费来自 Kafka 的消息。
是否可以从代码中读取 Kafka 消息密钥?
我有一个 Kafka 主题,它通常有两种类型的消息。要采取的操作因消息键而异。我看到 spring 文档只有以下内容来阅读消息。在这里,我需要指定消息的实际映射(此处为 Greetings 类)。但是,我需要一种方法来读取消息键并确定可反序列化的 Pojo
public class GreetingsListener {
@StreamListener(GreetingsProcessor.INPUT)
public void handleGreetings(@Payload Greetings request) {
}
}
Run Code Online (Sandbox Code Playgroud) 我的消费者绑定到匿名消费者组,而不是我指定的消费者组。
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
zkNodes: localhost
defaultZkPort: 2181
bindings:
inEvent:
group: eventin
destination: event
outEvent:
group: eventout
destination: processevent
Run Code Online (Sandbox Code Playgroud)
我的 Spring Boot 应用程序
@SpringBootApplication
@EnableBinding(EventStream.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(value = "inEvent")
public void getEvent(Event event){
System.out.println(event.name);
}
}
Run Code Online (Sandbox Code Playgroud)
我的输入输出通道接口
public interface EventStream {
@Input("inEvent")
SubscribableChannel inEvent();
@Output("outEvent")
MessageChannel outEvent();
}
Run Code Online (Sandbox Code Playgroud)
我的控制台日志--
:在 3.233 秒内启动 ConsumerApplication(JVM 运行 4.004):[ Consumer clientId=consumer-3, groupId=anonymous.0d0c87d6-ef39-4bfe-b475-4491c40caf6d]发现组协调员 singh:9092 (id: 2147483647 …
spring-boot spring-cloud kafka-consumer-api spring-cloud-stream spring-kafka
我无法在此处以及 Spring 网站和博客上找到 Spring Cloud Stream 是否能够提供 Kafka Stream API 提供的“Exactly Once”语义。也许没有单个配置/注释,并且在线程“是否可以使用 Spring Cloud Stream 进行一次处理? ”我可以找到一些有用的东西,但专家的答案是非常高的水平。感谢帮助
看了spring cloud stream 3.0的文档,理解了新的用java.util.function.[Supplier/Function/Consumer]来代表生产者、消费和生产、消费者,这个应该是正确的。
但我不了解供应商。
该文档指出,对供应商的轮询用于一致地为供应商生成数据,并且不需要任何程序参与。
但是很多时候,我们需要在特定时间生成数据,例如 Web 请求,而我找不到任何文档或示例。
它可能就像注入 Supplier 对象并调用 get() 方法一样简单,但是如何禁用轮询调用呢?
感谢所有提供信息的人。
apache-kafka ×6
java ×2
spring ×2
spring-kafka ×2
rabbitmq ×1
spring-boot ×1
spring-cloud ×1