标签: spring-kafka

Kafka Kstream 和 Spring @KafkaListener 有什么不同?

我对卡夫卡有点陌生,正在阅读文档。Kafka 办公网站有一个关于 KStream 的示例。应用程序绑定到某个主题,消息一到达就会立即进行处理。结果将发布回主题或数据库。

Spring Kafka 注释 @KafkaListener 具有相同的功能。例如,我尝试了KafaListner 应用程序。同样在这里,我们收听一个主题并在发布内容时对其进行处理。

所以我很想知道 1. 这两个有什么不同?2. 在什么场景下选择哪一个?

apache-kafka spring-kafka

1
推荐指数
1
解决办法
3126
查看次数

Spring Kafka 监听 Http 请求

我正在使用 Spring 和 Kafka,我发出如下所示的 HTTP POST 请求,并通过 Kafka 主题将一些信息发送到另一个服务。

@RequestMapping(method = RequestMethod.POST, value = "/portfolio")
public void getPortfolio(
       Authentication auth,
    @RequestBody User user
) {
    //Data Transfer Object
    UserDTO dto = user.toDTO();
    dto.setId(((AuthenticatedUser) auth.getPrincipal()).getId());

    //Sending message to Kafka topic
    sender.sendPortfolioRequest(dto);
}
Run Code Online (Sandbox Code Playgroud)

然后我想监听不同主题的响应并在 HTTP 响应中返回数据,但我被困在这里。我可以使用下面的侦听器方法来侦听响应,但不知道如何将两者放在一起。

@KafkaListener(
    topics = Topics.PORTFOLIO_RESULT,
    containerFactory = "portfolioKafkaListenerContainerFactory"
)
public void portfolioListener(UserPortfolioDTO portfolio) {
    System.out.println("Recieved Portfolio: " + portfolio.toString());
}
Run Code Online (Sandbox Code Playgroud)

PS 我是使用 HTTP 请求的新手,不知道这是否是我想要实现的目标的正确方法,或者我是否应该使用 POST 创建新资源并重定向到该资源或其他资源。

java spring apache-kafka spring-kafka

1
推荐指数
1
解决办法
2086
查看次数

使用 Kafka 实现 STOMP 协议

我正在使用 websocket 协议和 STOMP 作为消息协议为网络平台开发聊天模块。

这是我第一次使用任何消息代理,Kafka 是在公司(我正在工作的)网络平台上使用的消息代理,我猜是用于其他模块。之前我刚开始时使用的是 RabbitMQ,现在我必须切换到 Kafka。我在RabbitMQ的网站上看到有一整篇关于如何使用STOMP的文章,但是Kafka的官方网站上没有这样的东西。

但我探索了其他几个来源、许多教程,但找不到任何与 Kafka 一起使用 STOMP 协议相关的内容,这让我问这是否可能?

这是我的 websocket 配置类:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic");

//Here's the line I wrote to use Kafka as a MB, but doesn't work        
registry.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(9092);
Run Code Online (Sandbox Code Playgroud)

启动 Kafka,然后运行我的 Java Spring 应用程序后,我从 java.io.IOException 中收到“连接由对等方重置”,如果一切正常,则不应抛出该异常。

我使用 Kakfa 的 2.2.0 版本、Zookeeper 的 3.4.14 版本,并使用 STS 3 作为我的 IDE。

任何帮助,将不胜感激。

java stomp websocket apache-kafka spring-kafka

1
推荐指数
1
解决办法
6957
查看次数

如何测试使用 Avro 和 Confluence Schema Registry 的 Spring Cloud Stream Kafka Streams 应用程序?

我无法弄清楚如何测试使用 Avro 作为消息格式和(融合)架构注册表的 Spring Cloud Stream Kafka Streams 应用程序。

配置可能是这样的:

spring:
  application:
    name: shipping-service
  cloud:
    stream:
      schema-registry-client:
        endpoint: http://localhost:8081
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: shipping-service
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
              schema:
                registry:
                  url: ${spring.cloud.stream.schema-registry-client.endpoint}
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
          bindings:
            input:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            order:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
            output:
              producer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
      bindings:
        input:
          destination: customer
        order:
          destination: order
        output:
          destination: order

server:
  port: 8086

logging:
  level:
    org.springframework.kafka.config: debug
Run Code Online (Sandbox Code Playgroud)

笔记:

  • 它使用本机序列化/反序列化。
  • 测试框架:Junit 5

我想关于 Kafka Broker,我应该使用 EmbeddedKafkaBroker bean,但如您所见,它还依赖于应该以某种方式模拟的模式注册表。如何?

apache-kafka spring-cloud spring-cloud-stream apache-kafka-streams spring-kafka

1
推荐指数
1
解决办法
2098
查看次数

Micrometer KafkaConsumerMetrics 在本地运行时存在,但在部署时不存在

当我在本地运行时,我可以看到kafka.consumer.正在收集这些信息。当我部署服务时,我发现这些指标不存在。

我使用kafka版本1.11.0,java 11和Spring Boot 2.2。

我如何确定缺少什么?

apache-kafka spring-boot spring-kafka micrometer spring-micrometer

1
推荐指数
1
解决办法
526
查看次数

是否有读取 Kafka 主题的 Spring Schedule 示例?

我们试图在指定的窗口时间从 Kafka 读取数据(所以我们有 Kafka 消费者),这意味着避免在其他时间读取数据。但是,我们不确定如何在时间段到期后关闭消费者。我想知道是否有任何示例可以说明如何做到这一点?非常感谢您帮助我们。

apache-kafka spring-scheduled kafka-consumer-api spring-kafka

1
推荐指数
1
解决办法
2509
查看次数

仅当某些条件为真时才使用来自 Kafka 的消息

我们有特定的主题,只有在条件consumeEnabled=true 时才需要消费消息。所以,它应该像这样工作:

  1. 如果应用程序正在启动并且consumeEnabled=true,则将分区分配给消费者并使用来自主题的消息。
  2. 如果应用程序正在启动并且consumeEnabled=false,则不要将分区分配给消费者,也不要使用来自主题的消息。
  3. 如果应用程序已经以consumeEnabled=false 运行,但在运行时属性变为consumeEnabled=true,则在运行时将分区分配给消费者并使用来自主题的消息。

应用程序正在使用消息,但随后 consumerEnabled 变为 false 无需考虑的情况。

请用 Spring Kafka 和/或 Kafka Java 客户端定义实现决策的最佳方式

java apache-kafka spring-kafka

1
推荐指数
1
解决办法
982
查看次数

春季卡夫卡面临org.apache.kafka.common.errors.InvalidPidMappingException

我正在使用 spring boot 2.1.9 和 spring Kafka 2.2.9。

我的 Spring Boot 应用程序在过去 2 周内一直处于空闲状态,它正在运行,但仍然没有人访问 API。但是当我今天开始使用时,对于第一个请求,它给了我以下错误。

2019-12-18 14:43:03.908 ERROR [xxxx-account-service,681c8de32a0d1127,681c8de32a0d1127,false] 10 --- [nio-8080-exec-5] o.s.t.i.TransactionInterceptor           : Application exception overridden by commit exception

org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors.
        at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:279)
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:264)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:862)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:470)
        at brave.kafka.clients.TracingProducer.send(TracingProducer.java:106)
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:407)
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:217)
        at com.xxxx.service.messaging.impl.MessageProducerImpl.sendStringMessage(MessageProducerImpl.java:61)
        at com.xxxx.service.messaging.impl.MessageProducerImpl.sendIntegerMessage(MessageProducerImpl.java:72)
        at com.xxxx.service.messaging.impl.MessageProducerImpl$$FastClassBySpringCGLIB$$5b6c8aa3.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:750)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-boot spring-kafka

1
推荐指数
2
解决办法
2108
查看次数

多个KafkaListener类可以监听同一个主题吗?

我有一个包含多个事件(不同类型)的卡夫卡主题,我想在单个应用程序中的不同处理程序类中处理这些事件。所以我的问题是 - 我可以创建两个使用相同主题的类(spring 组件),但每个类处理不同的事件(来自同一主题)吗?

@Component
@KafkaListener(topics = "topicA")
public class SomeClass {

    @KafkaHandler
    public void handleEventA(EventA eventA) {
    }
}

@Component
@KafkaListener(topics = "topicA")
public class AnotherClass {

    @KafkaHandler
    public void handleEventB(EventB eventB) {
    }

    @KafkaHandler
    public void handleEventC(EventC eventC) {
    }
}
Run Code Online (Sandbox Code Playgroud)

spring-kafka

1
推荐指数
1
解决办法
5125
查看次数

如何使用EmbeddedKafkaRule/EmbeddedKafka设置Spring Kafka测试来修复TopicExistsException间歇性错误?

我在测试 Kafka 消费者和生产者时遇到了问题。集成测试间歇性失败,并显示TopicExistsException.

这就是我当前的测试类 -UserEventListenerTest对于其中一位消费者来说是这样的:

@SpringBootTest(properties = ["application.kafka.user-event-topic=user-event-topic-UserEventListenerTest",
    "application.kafka.bootstrap=localhost:2345"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserEventListenerTest {
    private val logger: Logger = LoggerFactory.getLogger(javaClass)

    @Value("\${application.kafka.user-event-topic}")
    private lateinit var userEventTopic: String

    @Autowired
    private lateinit var kafkaConfigProperties: KafkaConfigProperties

    private lateinit var embeddedKafka: EmbeddedKafkaRule
    private lateinit var sender: KafkaSender<String, UserEvent>
    private lateinit var receiver: KafkaReceiver<String, UserEvent>

    @BeforeAll
    fun setup() {
        embeddedKafka = EmbeddedKafkaRule(1, false, userEventTopic)
        embeddedKafka.kafkaPorts(kafkaConfigProperties.bootstrap.substringAfterLast(":").toInt())
        embeddedKafka.before()

        val producerProps: HashMap<String, Any> = hashMapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "com.project.userservice.config.MockAvroSerializer"
        )
        val …
Run Code Online (Sandbox Code Playgroud)

kotlin apache-kafka spring-boot spring-kafka spring-kafka-test

1
推荐指数
1
解决办法
9909
查看次数