小编Mic*_*ant的帖子

Scala 中 Akka 的集成测试只能间歇性地通过

我正在尝试在 Scala 中为 Kafka 编写集成测试(并且对这两者都有点陌生);通过集成测试,我的意思是我的主代码中有一个ClosedShape RunnableGraph,我想通过 Kafka 主题将数据输入其中,然后检查通过 Kafka 主题输出的内容(而不是对其中的单个 Flow 进行单元测试RunnableGraph)。

这是一个简化的示例:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.kafka.{ProducerSettings, ConsumerSettings}
import akka.kafka.scaladsl.{Producer, Consumer}
import akka.kafka.scaladsl.Consumer.Control
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerConfig}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import GraphDSL.Implicits._

object SimpleKafkaStream {

  def apply(sourceTopic: String, targetTopic: String, kafkaBootstrapServer: String) (implicit actorSystem: ActorSystem) = {

    RunnableGraph.fromGraph (GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      source(sourceTopic, kafkaBootstrapServer) ~> transformMessage(targetTopic) ~> target(kafkaBootstrapServer)
      ClosedShape
    })
  }

  private def transformMessage (targetTopic: String) = …
Run Code Online (Sandbox Code Playgroud)

integration-testing scala akka apache-kafka

1
推荐指数
1
解决办法
767
查看次数

标签 统计

akka ×1

apache-kafka ×1

integration-testing ×1

scala ×1