我正在尝试在 Scala 中为 Kafka 编写集成测试(并且对这两者都有点陌生);通过集成测试,我的意思是我的主代码中有一个ClosedShape RunnableGraph,我想通过 Kafka 主题将数据输入其中,然后检查通过 Kafka 主题输出的内容(而不是对其中的单个 Flow 进行单元测试RunnableGraph)。
这是一个简化的示例:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.kafka.{ProducerSettings, ConsumerSettings}
import akka.kafka.scaladsl.{Producer, Consumer}
import akka.kafka.scaladsl.Consumer.Control
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerConfig}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import GraphDSL.Implicits._
object SimpleKafkaStream {
def apply(sourceTopic: String, targetTopic: String, kafkaBootstrapServer: String) (implicit actorSystem: ActorSystem) = {
RunnableGraph.fromGraph (GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
source(sourceTopic, kafkaBootstrapServer) ~> transformMessage(targetTopic) ~> target(kafkaBootstrapServer)
ClosedShape
})
}
private def transformMessage (targetTopic: String) = …Run Code Online (Sandbox Code Playgroud)