Lagom服务消耗来自Kafka的输入

ior*_*vic 6 java scala apache-kafka akka-stream lagom

我试图弄清楚如何使用Lagom来消费来自通过Kafka进行通信的外部系统的数据.

我已经遇到了Lagom文档的这一部分,它描述了Lagom服务如何通过订阅其主题与另一个Lagom服务进行通信.

helloService
  .greetingsTopic()
  .subscribe // <-- you get back a Subscriber instance
  .atLeastOnce(
  Flow.fromFunction(doSomethingWithTheMessage)
)
Run Code Online (Sandbox Code Playgroud)

但是,当您想要订阅包含某个随机外部系统生成的事件的Kafka主题时,适当的配置是什么?

这个功能需要某种适配器吗?为了澄清,我现在有这个:

object Aggregator {
  val TOPIC_NAME = "my-aggregation"
}

trait Aggregator extends Service {
  def aggregate(correlationId: String): ServiceCall[Data, Done]

  def aggregationTopic(): Topic[DataRecorded]

  override final def descriptor: Descriptor = {
    import Service._

    named("aggregator")
      .withCalls(
        pathCall("/api/aggregate/:correlationId", aggregate _)
      )
      .withTopics(
        topic(Aggregator.TOPIC_NAME, aggregationTopic())
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}
Run Code Online (Sandbox Code Playgroud)

我可以通过简单的POST请求调用它.但是,我希望通过使用Data来自某些(外部)Kafka主题的消息来调用它.

我想知道是否有这种方式以类似于这个模型的方式配置描述符:

override final def descriptor: Descriptor = {
  ...
  kafkaTopic("my-input-topic")
    .subscribe(serviceCall(aggregate _)
    .withAtMostOnceDelivery
}
Run Code Online (Sandbox Code Playgroud)

我碰到这在谷歌论坛的讨论,但在OPS的问题,我看不出他到底用什么EventMessage期从未来some-topic除了他们路由到由他的服务定义的话题.

编辑#1:进度更新

看一下文档,我决定尝试以下方法.我加入2-多个模块,aggregator-kafka-proxy-apiaggregator-kafka-proxy-impl.

在新的api模块中,我定义了一个没有方法的新服务,但是一个主题代表了我的Kafka主题:

object DataKafkaPublisher {
  val TOPIC_NAME = "data-in"
}

trait DataKafkaPublisher extends Service {
  def dataInTopic: Topic[DataPublished]

  override final def descriptor: Descriptor = {
    import Service._
    import DataKafkaPublisher._

    named("data-kafka-in")
      .withTopics(
        topic(TOPIC_NAME, dataInTopic)
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[SessionDataPublished](_.data.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}
Run Code Online (Sandbox Code Playgroud)

在impl模块中,我只是做了标准实现

class DataKafkaPublisherImpl(persistentEntityRegistry: PersistentEntityRegistry) extends DataKafkaPublisher {
  override def dataInTopic: Topic[api.DataPublished] =
    TopicProducer.singleStreamWithOffset {
      fromOffset =>
        persistentEntityRegistry.eventStream(KafkaDataEvent.Tag, fromOffset)
          .map(ev => (convertEvent(ev), ev.offset))
    }

  private def convertEvent(evt: EventStreamElement[KafkaDataEvent]): api.DataPublished = {
    evt.event match {
      case DataPublished(data) => api.DataPublished(data)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

现在,为了实际使用这些事件,在我的aggregator-impl模块中,我添加了一个"订阅者"服务,它接受这些事件,并在实体上调用适当的命令.

class DataKafkaSubscriber(persistentEntityRegistry: PersistentEntityRegistry, kafkaPublisher: DataKafkaPublisher) {

  kafkaPublisher.dataInTopic.subscribe.atLeastOnce(
    Flow[DataPublished].mapAsync(1) { sd =>
      sessionRef(sd.data.correlationId).ask(RecordData(sd.data))
    }
  )

  private def sessionRef(correlationId: String) =
    persistentEntityRegistry.refFor[Entity](correlationId)
}
Run Code Online (Sandbox Code Playgroud)

这实际上允许我在Kafka主题"data-in"上发布消息,然后RecordData在发布给要消费的实体之前将其代理并转换为命令.

但是,对我来说,这似乎有些苛刻.我与Lagom internals联系到Kafka.我无法轻易交换数据来源.例如,如果我愿意,我将如何使用来自RabbitMQ的外部消息?如果我想从另一个Kafka消费(不同于Lagom使用的那个)怎么办?

编辑#2:更多文档

我在Lagom文档上发现了一些文章,特别是这个:

从第三方消费主题

您可能希望您的Lagom服务使用在Lagom中未实现的服务上生成的数据.在这种情况下,如"服务客户端"部分所述,您可以在Lagom项目中创建第三方服务API模块.该模块将包含一个服务描述符,声明您将使用的主题.一旦实现了ThirdPartyService接口和相关类,就应该添加第三方服务api作为对fancy-service-impl的依赖.最后,您可以使用ThirdPartyService中描述的主题,如订阅主题部分中所述.

ior*_*vic 0

Alan Klikic在此处的Lightbend 讨论论坛上提供了答案。

第1部分:

如果您仅在业务服务中使用外部 Kafka 集群,那么您可以仅使用 Lagom Broker API 来实现此目的。所以你需要:

  1. 使用仅具有主题定义的服务描述符创建 API(此 API 尚未实现)
  2. 在您的业务服务中根据您的部署配置 kafka_native (正如我在上一篇文章中提到的)
  3. 在您的业务服务中从 #1 中创建的 API 注入服务并使用 Lagom Broker API 订阅者订阅它

Lagom Broker API 订阅者中的偏移量提交是开箱即用的。

第2部分:

Kafka 和 AMQP 消费者实现需要持久的 akka 流。所以你需要处理断开连接。这些可以通过两种方式完成:

  1. 通过将持久的 akka 流包装在一个 actor 中来控制它。您可以在 actor preStart 上初始化流流,并将流完成传送到将停止它的 actor。如果流完成或失败,actor 将停止。然后使用重新启动策略将 Actor 包装在 Actor 退避中,这将在完成或失败的情况下重新启动 Actor 并重新初始化 Flow
  2. akka 流延迟重启并带有退避阶段

我个人使用#1,还没有尝试#2。

可以在 Lagom 组件特征中初始化 #1 的退避 actor 或 #2 的 Flow(基本上在您现在使用 Lagom Broker API 进行订阅的同一位置)。

配置consumer时一定要设置consumer组,以保证避免重复消费。您可以像 Lagom 一样使用描述符中的服务名称作为消费者组名称。