标签: reactive-kafka

在失败时正常重启Reactive-Kafka Consumer Stream

问题 当我重新启动/完成/停止流时,旧的消费者不会死/关闭:

[INFO ] a.a.RepointableActorRef -
  Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] 
  from Actor[akka://ufo-sightings/deadLetters]
  to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
  was not delivered. [1] dead letters encountered.
Run Code Online (Sandbox Code Playgroud)

描述 我正在构建一个服务,它接收来自Kafka主题的消息,并通过HTTP请求将消息发送到外部服务.

  1. 可以断开与外部服务的连接,我的服务需要重试该请求.

  2. 此外,如果Stream中存在错误,则需要重新启动整个流.

  3. 最后,有时我不需要流和相应的Kafka消费者,我想关闭整个流

所以我有一个流:

Consumer.committableSource(customizedSettings, subscriptions)
  .flatMapConcat(sourceFunction)
  .toMat(Sink.ignore)
  .run
Run Code Online (Sandbox Code Playgroud)

发送Http请求 sourceFunction

我在新文档中遵循了新的Kafka Consumer Restart说明

  RestartSource.withBackoff(
      minBackoff = 20.seconds,
      maxBackoff = 5.minutes,
      randomFactor = 0.2 ) { () =>
          Consumer.committableSource(customizedSettings, subscriptions)
            .watchTermination() {
                case (consumerControl, streamComplete) =>
                  logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
                  consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened …
Run Code Online (Sandbox Code Playgroud)

scala akka apache-kafka akka-stream reactive-kafka

12
推荐指数
1
解决办法
729
查看次数

在 Spring Boot 应用程序中实现 Reactive Kafka Listener

我正在尝试在我的 Spring Boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例:https : //github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main /java/reactor/kafka/samples/SampleScenarios.java

看起来反应式 kafka 中尚不支持 Spring

我了解 kafka 侦听器如何在 Spring 中的非反应式 kafka API 中工作:最简单的解决方案是为 ConcurrentKafkaListenerContainerFactory 和 ConsumerFactory 配置 bean,然后使用 @KafkaListener 注释和瞧

但是我现在不确定如何在 Spring 中正确使用反应式 kafka。

基本上我需要一个话题的听众。我应该创建某种循环或我自己的调度程序吗?或者也许我错过了一些东西。任何人都可以分享他们的知识和最佳实践吗?

java kafka-consumer-api reactive-kafka spring-kafka reactor-kafka

7
推荐指数
1
解决办法
2036
查看次数

如何确保生成常量Avro架构并避免"为x创建的太多架构对象"异常?

在使用反应式kafkaavro4s生成Avro消息时,我遇到了可重现的错误.一旦到达identityMapCapacityclient(CachedSchemaRegistryClient),序列化就失败了

java.lang.IllegalStateException: Too many schema objects created for <myTopic>-value
Run Code Online (Sandbox Code Playgroud)

这是意料之外的,因为所有消息都应该具有相同的模式 - 它们是相同案例类的序列化.

val avroProducerSettings: ProducerSettings[String, GenericRecord] = 
  ProducerSettings(system, Serdes.String().serializer(), 
  avroSerde.serializer())
 .withBootstrapServers(settings.bootstrapServer)

val avroProdFlow: Flow[ProducerMessage.Message[String, GenericRecord, String],
                    ProducerMessage.Result[String, GenericRecord, String],
                    NotUsed] = Producer.flow(avroProducerSettings)

val avroQueue: SourceQueueWithComplete[Message[String, GenericRecord, String]] = 
  Source.queue(bufferSize, overflowStrategy)
  .via(avroProdFlow)
  .map(logResult)
  .to(Sink.ignore)
  .run()

...
queue.offer(msg)
Run Code Online (Sandbox Code Playgroud)

序列化程序是一个KafkaAvroSerializer实例化的new CachedSchemaRegistryClient(settings.schemaRegistry, 1000)

生成GenericRecord:

def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord =
  recordFormat.to(a)

val makeEdgeMessage: (Edge, String) => Message[String, GenericRecord, String] = …
Run Code Online (Sandbox Code Playgroud)

scala avro apache-kafka reactive-kafka confluent-schema-registry

6
推荐指数
1
解决办法
1242
查看次数

如何在 Spring Reactor Kafka 中创建 KafkaReceiver 的多个实例

我有一个反应式 kafka 应用程序,它从一个主题读取数据并写入另一个主题。该主题有多个分区,我想创建与主题中的分区相同数量的消费者(在同一消费者组中)。根据我对此线程的理解,.receive() 将仅创建一个 KafkaReceiver 实例,该实例将从主题中的所有分区中读取数据。所以我需要多个接收器并行地从不同的分区读取数据。

为此,我想出了以下代码:

@Bean
    public ReceiverOptions<String, String> kafkaReceiverOptions(String topic, KafkaProperties kafkaProperties) {
        ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        return basicReceiverOptions.subscription(Collections.singletonList(topic))
                .addAssignListener(receiverPartitions -> log.debug("onPartitionAssigned {}", receiverPartitions))
                .addRevokeListener(receiverPartitions -> log.debug("onPartitionsRevoked {}", receiverPartitions));
    }

    @Bean
    public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
        return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
    }

    @Bean
    public ReactiveKafkaProducerTemplate<String, List<Object>> kafkaProducerTemplate(
            KafkaProperties properties) {
        Map<String, Object> props = properties.buildProducerProperties();
        return new ReactiveKafkaProducerTemplate<String, List<Object>>(SenderOptions.create(props));
    }


public void run(String... args) {

        for(int i = 0; i < topicPartitionsCount ; …
Run Code Online (Sandbox Code Playgroud)

apache-kafka project-reactor reactive-kafka spring-kafka spring-webflux

5
推荐指数
1
解决办法
1902
查看次数

Akka Streams Reactive Kafka - 高负荷下的OutOfMemoryError

我正在运行Akka Streams Reactive Kafka应用程序,该应用程序应该在高负载下运行.运行应用程序大约10分钟后,应用程序关闭了OutOfMemoryError.我试图调试堆转储,发现它akka.dispatch.Dispatcher占用了大约5GB的内存.以下是我的配置文件.

Akka版本:2.4.18

Reactive Kafka版本:2.4.18

1 application.conf.:

consumer {
num-consumers = "2"
c1 {
  bootstrap-servers = "localhost:9092"
  bootstrap-servers=${?KAFKA_CONSUMER_ENDPOINT1}
  groupId = "testakkagroup1"
  subscription-topic = "test"
  subscription-topic=${?SUBSCRIPTION_TOPIC1}
  message-type = "UserEventMessage"
  poll-interval = 100ms
  poll-timeout = 50ms
  stop-timeout = 30s
  close-timeout = 20s
  commit-timeout = 15s
  wakeup-timeout = 10s
  use-dispatcher = "akka.kafka.default-dispatcher"
  kafka-clients {
    enable.auto.commit = true
  }
}  
Run Code Online (Sandbox Code Playgroud)

2 . build.sbt:

java -Xmx6g \
-Dcom.sun.management.jmxremote.port=27019 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=localhost \
-Dzookeeper.host=$ZK_HOST \ …
Run Code Online (Sandbox Code Playgroud)

scala akka akka-stream akka-dispatcher reactive-kafka

3
推荐指数
1
解决办法
287
查看次数

Reactor Kafka:ReactiveKafkaProducerTemplate

我刚刚开始使用 Reactor Kafka。我想知道何时以及为何使用ReactiveKafkaProducerTemplate而不是官方 Reactor kafka 参考指南中的 KafkaSender?

apache-kafka project-reactor reactive-kafka spring-kafka

3
推荐指数
1
解决办法
2030
查看次数