如何测试 Kafka 消费者

him*_*ian 1 scala apache-kafka kafka-consumer-api

我有一个 Kafka Consumer(内置于 Scala),它从 Kafka 中提取最新记录。消费者看起来像这样:

val consumerProperties = new Properties()
consumerProperties.put("bootstrap.servers", "localhost:9092")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("group.id", "something")
consumerProperties.put("auto.offset.reset", "latest")

val consumer = new KafkaConsumer[String, String](consumerProperties)
consumer.subscribe(java.util.Collections.singletonList("topic"))
Run Code Online (Sandbox Code Playgroud)

现在,我想为它编写一个集成测试。有没有测试 Kafka 消费者的方法或最佳实践?

pra*_*upd 5

  1. 您需要以编程方式启动 zookeeper 和 kafka 以进行集成测试。

    1.1 启动zookeeper ( ZooKeeperServer)

    def startZooKeeper(zooKeeperPort: Int, zkLogsDir: Directory): ServerCnxnFactory = {
        val tickTime = 2000
    
        val zkServer = new ZooKeeperServer(zkLogsDir.toFile.jfile, zkLogsDir.toFile.jfile, tickTime)
    
        val factory = ServerCnxnFactory.createFactory
        factory.configure(new InetSocketAddress("0.0.0.0", zooKeeperPort), 1024)
        factory.startup(zkServer)
    
        factory
    }
    
    Run Code Online (Sandbox Code Playgroud)

    1.2 启动kafka ( KafkaServer)

    case class StreamConfig(streamTcpPort: Int = 9092,
                        streamStateTcpPort :Int = 2181,
                        stream: String,
                        numOfPartition: Int = 1,
                        nodes: Map[String, String] = Map.empty)
    
    def startKafkaBroker(config: StreamConfig,
                       kafkaLogDir: Directory): KafkaServer = {
    
      val syncServiceAddress = s"localhost:${config.streamStateTcpPort}"
    
      val properties: Properties = new Properties
      properties.setProperty("zookeeper.connect", syncServiceAddress)
      properties.setProperty("broker.id", "0")
      properties.setProperty("host.name", "localhost")
      properties.setProperty("advertised.host.name", "localhost")
      properties.setProperty("port", config.streamTcpPort.toString)
      properties.setProperty("auto.create.topics.enable", "true")
      properties.setProperty("log.dir", kafkaLogDir.toAbsolute.path)
      properties.setProperty("log.flush.interval.messages", 1.toString)
      properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577")
    
      config.nodes.foreach {
        case (key, value) => properties.setProperty(key, value)
      }
    
      val broker = new KafkaServer(new KafkaConfig(properties))
      broker.startup()
    
      println(s"KafkaStream Broker started at ${properties.get("host.name")}:${properties.get("port")} at ${kafkaLogDir.toFile}")
      broker
    
    Run Code Online (Sandbox Code Playgroud)

    }

  2. 发出一些事件以使用 KafkaProducer

  3. 然后与您的消费者一起消费以测试和验证其工作

您可以使用scalatest-eventstream,它具有startBroker为您启动 Zookeeper 和 Kafka 的方法。

也有destroyBroker它会在测试后清理你的 kafka。

例如。

class MyStreamConsumerSpecs extends FunSpec with BeforeAndAfterAll with Matchers {
  implicit val config =
    StreamConfig(streamTcpPort = 9092, streamStateTcpPort = 2181, stream = "test-topic", numOfPartition = 1)

  val kafkaStream = new KafkaEmbeddedStream

  override protected def beforeAll(): Unit = {
    kafkaStream.startBroker
  }

  override protected def afterAll(): Unit = {
    kafkaStream.destroyBroker
  }

  describe("Kafka Embedded stream") {
    it("does consume some events") {

      //uses application.properties
      //emitter.broker.endpoint=localhost:9092
      //emitter.event.key.serializer=org.apache.kafka.common.serialization.StringSerializer
      //emitter.event.value.serializer=org.apache.kafka.common.serialization.StringSerializer
      kafkaStream.appendEvent("test-topic", """{"MyEvent" : { "myKey" : "myValue"}}""")

      val consumerProperties = new Properties()
      consumerProperties.put("bootstrap.servers", "localhost:9092")
      consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
      consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
      consumerProperties.put("group.id", "something")
      consumerProperties.put("auto.offset.reset", "earliest")

      val myConsumer = new KafkaConsumer[String, String](consumerProperties)
      myConsumer.subscribe(java.util.Collections.singletonList("test-topic"))

      val events = myConsumer.poll(2000)

      events.count() shouldBe 1
      events.iterator().next().value() shouldBe """{"MyEvent" : { "myKey" : "myValue"}}"""
      println("=================" + events.count())
    }
  }
}
Run Code Online (Sandbox Code Playgroud)