问题 当我重新启动/完成/停止流时,旧的消费者不会死/关闭:
[INFO ] a.a.RepointableActorRef -
Message [akka.kafka.KafkaConsumerActor$Internal$Stop$]
from Actor[akka://ufo-sightings/deadLetters]
to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
was not delivered. [1] dead letters encountered.
Run Code Online (Sandbox Code Playgroud)
描述 我正在构建一个服务,它接收来自Kafka主题的消息,并通过HTTP请求将消息发送到外部服务.
可以断开与外部服务的连接,我的服务需要重试该请求.
此外,如果Stream中存在错误,则需要重新启动整个流.
最后,有时我不需要流和相应的Kafka消费者,我想关闭整个流
所以我有一个流:
Consumer.committableSource(customizedSettings, subscriptions)
.flatMapConcat(sourceFunction)
.toMat(Sink.ignore)
.run
Run Code Online (Sandbox Code Playgroud)
发送Http请求 sourceFunction
我在新文档中遵循了新的Kafka Consumer Restart说明
RestartSource.withBackoff(
minBackoff = 20.seconds,
maxBackoff = 5.minutes,
randomFactor = 0.2 ) { () =>
Consumer.committableSource(customizedSettings, subscriptions)
.watchTermination() {
case (consumerControl, streamComplete) =>
logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened …
Run Code Online (Sandbox Code Playgroud) 我正在尝试在我的 Spring Boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例:https : //github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main /java/reactor/kafka/samples/SampleScenarios.java
看起来反应式 kafka 中尚不支持 Spring
我了解 kafka 侦听器如何在 Spring 中的非反应式 kafka API 中工作:最简单的解决方案是为 ConcurrentKafkaListenerContainerFactory 和 ConsumerFactory 配置 bean,然后使用 @KafkaListener 注释和瞧
但是我现在不确定如何在 Spring 中正确使用反应式 kafka。
基本上我需要一个话题的听众。我应该创建某种循环或我自己的调度程序吗?或者也许我错过了一些东西。任何人都可以分享他们的知识和最佳实践吗?
java kafka-consumer-api reactive-kafka spring-kafka reactor-kafka
在使用反应式kafka和avro4s生成Avro消息时,我遇到了可重现的错误.一旦到达identityMapCapacity
client(CachedSchemaRegistryClient
),序列化就失败了
java.lang.IllegalStateException: Too many schema objects created for <myTopic>-value
Run Code Online (Sandbox Code Playgroud)
这是意料之外的,因为所有消息都应该具有相同的模式 - 它们是相同案例类的序列化.
val avroProducerSettings: ProducerSettings[String, GenericRecord] =
ProducerSettings(system, Serdes.String().serializer(),
avroSerde.serializer())
.withBootstrapServers(settings.bootstrapServer)
val avroProdFlow: Flow[ProducerMessage.Message[String, GenericRecord, String],
ProducerMessage.Result[String, GenericRecord, String],
NotUsed] = Producer.flow(avroProducerSettings)
val avroQueue: SourceQueueWithComplete[Message[String, GenericRecord, String]] =
Source.queue(bufferSize, overflowStrategy)
.via(avroProdFlow)
.map(logResult)
.to(Sink.ignore)
.run()
...
queue.offer(msg)
Run Code Online (Sandbox Code Playgroud)
序列化程序是一个KafkaAvroSerializer
实例化的new CachedSchemaRegistryClient(settings.schemaRegistry, 1000)
生成GenericRecord
:
def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord =
recordFormat.to(a)
val makeEdgeMessage: (Edge, String) => Message[String, GenericRecord, String] = …
Run Code Online (Sandbox Code Playgroud) scala avro apache-kafka reactive-kafka confluent-schema-registry
我有一个反应式 kafka 应用程序,它从一个主题读取数据并写入另一个主题。该主题有多个分区,我想创建与主题中的分区相同数量的消费者(在同一消费者组中)。根据我对此线程的理解,.receive() 将仅创建一个 KafkaReceiver 实例,该实例将从主题中的所有分区中读取数据。所以我需要多个接收器并行地从不同的分区读取数据。
为此,我想出了以下代码:
@Bean
public ReceiverOptions<String, String> kafkaReceiverOptions(String topic, KafkaProperties kafkaProperties) {
ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList(topic))
.addAssignListener(receiverPartitions -> log.debug("onPartitionAssigned {}", receiverPartitions))
.addRevokeListener(receiverPartitions -> log.debug("onPartitionsRevoked {}", receiverPartitions));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
}
@Bean
public ReactiveKafkaProducerTemplate<String, List<Object>> kafkaProducerTemplate(
KafkaProperties properties) {
Map<String, Object> props = properties.buildProducerProperties();
return new ReactiveKafkaProducerTemplate<String, List<Object>>(SenderOptions.create(props));
}
public void run(String... args) {
for(int i = 0; i < topicPartitionsCount ; …
Run Code Online (Sandbox Code Playgroud) apache-kafka project-reactor reactive-kafka spring-kafka spring-webflux
我正在运行Akka Streams Reactive Kafka应用程序,该应用程序应该在高负载下运行.运行应用程序大约10分钟后,应用程序关闭了OutOfMemoryError
.我试图调试堆转储,发现它akka.dispatch.Dispatcher
占用了大约5GB的内存.以下是我的配置文件.
Akka版本:2.4.18
Reactive Kafka版本:2.4.18
1 application.conf
.:
consumer {
num-consumers = "2"
c1 {
bootstrap-servers = "localhost:9092"
bootstrap-servers=${?KAFKA_CONSUMER_ENDPOINT1}
groupId = "testakkagroup1"
subscription-topic = "test"
subscription-topic=${?SUBSCRIPTION_TOPIC1}
message-type = "UserEventMessage"
poll-interval = 100ms
poll-timeout = 50ms
stop-timeout = 30s
close-timeout = 20s
commit-timeout = 15s
wakeup-timeout = 10s
use-dispatcher = "akka.kafka.default-dispatcher"
kafka-clients {
enable.auto.commit = true
}
}
Run Code Online (Sandbox Code Playgroud)
2 . build.sbt
:
java -Xmx6g \
-Dcom.sun.management.jmxremote.port=27019 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=localhost \
-Dzookeeper.host=$ZK_HOST \ …
Run Code Online (Sandbox Code Playgroud) 我刚刚开始使用 Reactor Kafka。我想知道何时以及为何使用ReactiveKafkaProducerTemplate而不是官方 Reactor kafka 参考指南中的 KafkaSender?