我对卡夫卡有点陌生,正在阅读文档。Kafka 办公网站有一个关于 KStream 的示例。应用程序绑定到某个主题,消息一到达就会立即进行处理。结果将发布回主题或数据库。
Spring Kafka 注释 @KafkaListener 具有相同的功能。例如,我尝试了KafaListner 应用程序。同样在这里,我们收听一个主题并在发布内容时对其进行处理。
所以我很想知道 1. 这两个有什么不同?2. 在什么场景下选择哪一个?
我正在使用 Spring 和 Kafka,我发出如下所示的 HTTP POST 请求,并通过 Kafka 主题将一些信息发送到另一个服务。
@RequestMapping(method = RequestMethod.POST, value = "/portfolio")
public void getPortfolio(
Authentication auth,
@RequestBody User user
) {
//Data Transfer Object
UserDTO dto = user.toDTO();
dto.setId(((AuthenticatedUser) auth.getPrincipal()).getId());
//Sending message to Kafka topic
sender.sendPortfolioRequest(dto);
}
Run Code Online (Sandbox Code Playgroud)
然后我想监听不同主题的响应并在 HTTP 响应中返回数据,但我被困在这里。我可以使用下面的侦听器方法来侦听响应,但不知道如何将两者放在一起。
@KafkaListener(
topics = Topics.PORTFOLIO_RESULT,
containerFactory = "portfolioKafkaListenerContainerFactory"
)
public void portfolioListener(UserPortfolioDTO portfolio) {
System.out.println("Recieved Portfolio: " + portfolio.toString());
}
Run Code Online (Sandbox Code Playgroud)
PS 我是使用 HTTP 请求的新手,不知道这是否是我想要实现的目标的正确方法,或者我是否应该使用 POST 创建新资源并重定向到该资源或其他资源。
我正在使用 websocket 协议和 STOMP 作为消息协议为网络平台开发聊天模块。
这是我第一次使用任何消息代理,Kafka 是在公司(我正在工作的)网络平台上使用的消息代理,我猜是用于其他模块。之前我刚开始时使用的是 RabbitMQ,现在我必须切换到 Kafka。我在RabbitMQ的网站上看到有一整篇关于如何使用STOMP的文章,但是Kafka的官方网站上没有这样的东西。
但我探索了其他几个来源、许多教程,但找不到任何与 Kafka 一起使用 STOMP 协议相关的内容,这让我问这是否可能?
这是我的 websocket 配置类:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic");
//Here's the line I wrote to use Kafka as a MB, but doesn't work
registry.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(9092);
Run Code Online (Sandbox Code Playgroud)
启动 Kafka,然后运行我的 Java Spring 应用程序后,我从 java.io.IOException 中收到“连接由对等方重置”,如果一切正常,则不应抛出该异常。
我使用 Kakfa 的 2.2.0 版本、Zookeeper 的 3.4.14 版本,并使用 STS 3 作为我的 IDE。
任何帮助,将不胜感激。
我无法弄清楚如何测试使用 Avro 作为消息格式和(融合)架构注册表的 Spring Cloud Stream Kafka Streams 应用程序。
配置可能是这样的:
spring:
application:
name: shipping-service
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8081
kafka:
streams:
binder:
configuration:
application:
id: shipping-service
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
schema:
registry:
url: ${spring.cloud.stream.schema-registry-client.endpoint}
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
bindings:
input:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
order:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
output:
producer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
input:
destination: customer
order:
destination: order
output:
destination: order
server:
port: 8086
logging:
level:
org.springframework.kafka.config: debug
Run Code Online (Sandbox Code Playgroud)
笔记:
我想关于 Kafka Broker,我应该使用 EmbeddedKafkaBroker bean,但如您所见,它还依赖于应该以某种方式模拟的模式注册表。如何?
apache-kafka spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka
当我在本地运行时,我可以看到kafka.consumer.正在收集这些信息。当我部署服务时,我发现这些指标不存在。
我使用kafka版本1.11.0,java 11和Spring Boot 2.2。
我如何确定缺少什么?
apache-kafka spring-boot spring-kafka micrometer spring-micrometer
我们试图在指定的窗口时间从 Kafka 读取数据(所以我们有 Kafka 消费者),这意味着避免在其他时间读取数据。但是,我们不确定如何在时间段到期后关闭消费者。我想知道是否有任何示例可以说明如何做到这一点?非常感谢您帮助我们。
apache-kafka spring-scheduled kafka-consumer-api spring-kafka
我们有特定的主题,只有在条件consumeEnabled=true 时才需要消费消息。所以,它应该像这样工作:
应用程序正在使用消息,但随后 consumerEnabled 变为 false 无需考虑的情况。
请用 Spring Kafka 和/或 Kafka Java 客户端定义实现决策的最佳方式
我正在使用 spring boot 2.1.9 和 spring Kafka 2.2.9。
我的 Spring Boot 应用程序在过去 2 周内一直处于空闲状态,它正在运行,但仍然没有人访问 API。但是当我今天开始使用时,对于第一个请求,它给了我以下错误。
2019-12-18 14:43:03.908 ERROR [xxxx-account-service,681c8de32a0d1127,681c8de32a0d1127,false] 10 --- [nio-8080-exec-5] o.s.t.i.TransactionInterceptor : Application exception overridden by commit exception
org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors.
at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:279)
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:264)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:862)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:470)
at brave.kafka.clients.TracingProducer.send(TracingProducer.java:106)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:407)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:217)
at com.xxxx.service.messaging.impl.MessageProducerImpl.sendStringMessage(MessageProducerImpl.java:61)
at com.xxxx.service.messaging.impl.MessageProducerImpl.sendIntegerMessage(MessageProducerImpl.java:72)
at com.xxxx.service.messaging.impl.MessageProducerImpl$$FastClassBySpringCGLIB$$5b6c8aa3.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at …Run Code Online (Sandbox Code Playgroud) 我有一个包含多个事件(不同类型)的卡夫卡主题,我想在单个应用程序中的不同处理程序类中处理这些事件。所以我的问题是 - 我可以创建两个使用相同主题的类(spring 组件),但每个类处理不同的事件(来自同一主题)吗?
@Component
@KafkaListener(topics = "topicA")
public class SomeClass {
@KafkaHandler
public void handleEventA(EventA eventA) {
}
}
@Component
@KafkaListener(topics = "topicA")
public class AnotherClass {
@KafkaHandler
public void handleEventB(EventB eventB) {
}
@KafkaHandler
public void handleEventC(EventC eventC) {
}
}
Run Code Online (Sandbox Code Playgroud) 我在测试 Kafka 消费者和生产者时遇到了问题。集成测试间歇性失败,并显示TopicExistsException.
这就是我当前的测试类 -UserEventListenerTest对于其中一位消费者来说是这样的:
@SpringBootTest(properties = ["application.kafka.user-event-topic=user-event-topic-UserEventListenerTest",
"application.kafka.bootstrap=localhost:2345"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserEventListenerTest {
private val logger: Logger = LoggerFactory.getLogger(javaClass)
@Value("\${application.kafka.user-event-topic}")
private lateinit var userEventTopic: String
@Autowired
private lateinit var kafkaConfigProperties: KafkaConfigProperties
private lateinit var embeddedKafka: EmbeddedKafkaRule
private lateinit var sender: KafkaSender<String, UserEvent>
private lateinit var receiver: KafkaReceiver<String, UserEvent>
@BeforeAll
fun setup() {
embeddedKafka = EmbeddedKafkaRule(1, false, userEventTopic)
embeddedKafka.kafkaPorts(kafkaConfigProperties.bootstrap.substringAfterLast(":").toInt())
embeddedKafka.before()
val producerProps: HashMap<String, Any> = hashMapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "com.project.userservice.config.MockAvroSerializer"
)
val …Run Code Online (Sandbox Code Playgroud) kotlin apache-kafka spring-boot spring-kafka spring-kafka-test
spring-kafka ×10
apache-kafka ×9
java ×3
spring-boot ×3
kotlin ×1
micrometer ×1
spring ×1
spring-cloud ×1
stomp ×1
websocket ×1