Scala 中 Akka 的集成测试只能间歇性地通过

Mic*_*ant 1 integration-testing scala akka apache-kafka

我正在尝试在 Scala 中为 Kafka 编写集成测试(并且对这两者都有点陌生);通过集成测试,我的意思是我的主代码中有一个ClosedShape RunnableGraph,我想通过 Kafka 主题将数据输入其中,然后检查通过 Kafka 主题输出的内容(而不是对其中的单个 Flow 进行单元测试RunnableGraph)。

这是一个简化的示例:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.kafka.{ProducerSettings, ConsumerSettings}
import akka.kafka.scaladsl.{Producer, Consumer}
import akka.kafka.scaladsl.Consumer.Control
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerConfig}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import GraphDSL.Implicits._

object SimpleKafkaStream {

  def apply(sourceTopic: String, targetTopic: String, kafkaBootstrapServer: String) (implicit actorSystem: ActorSystem) = {

    RunnableGraph.fromGraph (GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      source(sourceTopic, kafkaBootstrapServer) ~> transformMessage(targetTopic) ~> target(kafkaBootstrapServer)
      ClosedShape
    })
  }

  private def transformMessage (targetTopic: String) = Flow[ConsumerRecord[String, String]]
    .map (_.value())
    .map ("hello " + _)
    .map (message => { new ProducerRecord[String, String] (targetTopic, message) })

  private def source (topic: String, bootstrapServer: String) (implicit actorSystem: ActorSystem) : Source[ConsumerRecord[String, String], Control] = {
    val consumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer, Set(topic))
      .withBootstrapServers(bootstrapServer)
      .withGroupId(s"consumer_1_.$topic")
      .withClientId(s"consumer_1_.$topic")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    Consumer.plainSource(consumerSettings)
  }

  private def target (bootstrapServer: String) (implicit actorSystem: ActorSystem) = {
    Producer.plainSink(ProducerSettings(actorSystem, new StringSerializer, new StringSerializer)
      .withBootstrapServers(bootstrapServer))
  }
}
Run Code Online (Sandbox Code Playgroud)

然后用以下方法进行测试:

import java.util.UUID

import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, ProducerSettings}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.stream.ActorMaterializer
import akka.stream.testkit.javadsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.scalatest.{Matchers, WordSpec}

class SimpleKafkaStreamTest extends WordSpec with Matchers {

  "A person should be greeted" in new TestScope {
    startStream()
    send("World")
    requestNext() shouldBe "hello World"
  }

  trait TestScope extends E2EConfiguration with Kafka

  trait E2EConfiguration {
    implicit val actorSystem = ActorSystem("e2e-system")
    implicit val actorMaterializer = ActorMaterializer()
    val kafkaBootstrapServer = "192.168.99.100:9092"
    val sourceTopic = "person"
    val targetTopic = "greeting"
  }

  trait Kafka {
    this: E2EConfiguration =>

    private val consumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer, Set(targetTopic))
      .withBootstrapServers(kafkaBootstrapServer)
      .withGroupId(UUID.randomUUID().toString)
      .withClientId(UUID.randomUUID().toString)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

    val kafkaInputSource =
      TestSource.probe[String].map( name => {
        new ProducerRecord[String, String] (sourceTopic, name)
    }).to(Producer.plainSink(ProducerSettings(actorSystem, new StringSerializer, new StringSerializer)
      .withBootstrapServers(bootstrapServers = kafkaBootstrapServer))).run()

    val kafkaOutput = Consumer.plainSource(consumerSettings).runWith(TestSink.probe(actorSystem))
    def requestNext() = kafkaOutput.requestNext.value

    def send(name: String) = kafkaInputSource.sendNext(name)

    def startStream() = {
      SimpleKafkaStream(sourceTopic = sourceTopic, targetTopic = targetTopic, kafkaBootstrapServer = kafkaBootstrapServer).run()
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

所以,这应该将“World”写入主题“person”,并在主题“greeting”中返回“hello World”……偶尔会发生这种情况。然而,大多数时候,我得到:

Expected OnNext(_), yet no element signaled during 3 seconds
java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
    at akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:268)
    at akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:259)
    at akka.stream.testkit.TestSubscriber$Probe.requestNext(StreamTestKit.scala:631)
    at kafka.SimpleKafkaStreamTest$Kafka$class.requestNext(SimpleKafkaStreamTest.scala:56)
    at kafka.SimpleKafkaStreamTest$$anonfun$1$$anon$1.requestNext(SimpleKafkaStreamTest.scala:18)
    at kafka.SimpleKafkaStreamTest$$anonfun$1$$anon$1.<init>(SimpleKafkaStreamTest.scala:22)
    at kafka.SimpleKafkaStreamTest$$anonfun$1.apply$mcV$sp(SimpleKafkaStreamTest.scala:18)
    at kafka.SimpleKafkaStreamTest$$anonfun$1.apply(SimpleKafkaStreamTest.scala:18)
    at kafka.SimpleKafkaStreamTest$$anonfun$1.apply(SimpleKafkaStreamTest.scala:18)
Run Code Online (Sandbox Code Playgroud)

Kafka 根本没有获取数据。我究竟做错了什么?

Mic*_*ant 5

是的,在没有来自百万谎言之网的任何贡献的情况下我自己想出了这个办法。为了让遇到同样问题的其他人受益,以下是上述代码中需要修复的内容:

  • ConsumerConfig.AUTO_OFFSET_RESET_CONFIG需要"latest"而不是"earliest"从队列中获取最新条目。

  • 其次,上面的代码没有给 ActorSystem 一个机会正确关闭并通知 Kafka 两个消费者(一个在测试代码中,一个在被测代码中)现在都已死亡。如果没有这个,队列将保持锁定状态,直到会话超时时间(默认为 30")过去,并且任何后续运行的测试都将无法读取 Kafka 队列。通过让测试类扩展并包含来修复BeforeAndAfterAll,在一种afterAll方法中,Await.result (actorSystem.terminate(), 20.seconds)(10“不够长)。

  • 第三,我发现偏移提交有时会反复失败并立即重新安排,并且这种情况可能会持续长达 24 秒(尽管我确信更长的时间是可能的)。这使得(实际上是)kafkaOutput.requestNext()kafkaOutput适合TestSubscriber.Probe[String]目的;有必要改为使用( 让代码有机会在捕获[etc]形式的块kafkaOutput.requestNext(2.seconds)}中进行处理,并重试足够多的次数以显着超过上述 24" 周期。tryAssertionError"Expected OnNext(_), yet no element signaled"