Cho*_*eat 8 java unit-testing scala apache-kafka
我需要在 kafka 应用程序上执行单元测试,避免使用第三方库。
我现在的问题是我想清除测试之间的所有主题,但我不知道如何。
这是我的临时解决方案:提交每次测试后产生的每条消息,并将所有测试消费者放在同一个消费者组中。
override protected def afterEach():Unit={
val cleanerConsumer= newConsumer(Seq.empty)
val topics=cleanerConsumer.listTopics()
println("pulisco")
cleanerConsumer.subscribe(topics.keySet())
cleanerConsumer.poll(100)
cleanerConsumer.commitSync()
cleanerConsumer.close()
}
Run Code Online (Sandbox Code Playgroud)
但这不起作用,我不知道为什么。
例如,当我在测试中创建一个新的消费者时,它messages包含在前一个测试中产生的消息。
val consumerProbe = newConsumer(SMSGatewayTopic)
val messages = consumerProbe.poll(1000)
Run Code Online (Sandbox Code Playgroud)
我该如何解决这个问题?
您还可以在测试源中嵌入 Kafka/Zookeeper 实例,以便对此类隔离服务拥有更多控制权。
trait Kafka { self: ZooKeeper =>
Kafka.start()
}
object Kafka {
import org.apache.hadoop.fs.FileUtil
import kafka.server.KafkaServer
@volatile private var started = false
lazy val logDir = java.nio.file.Files.createTempDirectory("kafka-log").toFile
lazy val kafkaServer: KafkaServer = {
val config = com.typesafe.config.ConfigFactory.
load(this.getClass.getClassLoader)
val (host, port) = {
val (h, p) = config.getString("kafka.servers").span(_ != ':')
h -> p.drop(1).toInt
}
val serverConf = new kafka.server.KafkaConfig({
val props = new java.util.Properties()
props.put("port", port.toString)
props.put("broker.id", port.toString)
props.put("log.dir", logDir.getAbsolutePath)
props.put(
"zookeeper.connect",
s"localhost:${config getInt "test.zookeeper.port"}"
)
props
})
new KafkaServer(serverConf)
}
def start(): Unit = if (!started) {
try {
kafkaServer.startup()
started = true
} catch {
case err: Throwable =>
println(s"fails to start Kafka: ${err.getMessage}")
throw err
}
}
def stop(): Unit = try {
if (started) kafkaServer.shutdown()
} finally {
FileUtil.fullyDelete(logDir)
}
}
trait ZooKeeper {
ZooKeeper.start()
}
object ZooKeeper {
import java.nio.file.Files
import java.net.InetSocketAddress
import org.apache.hadoop.fs.FileUtil
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.ServerCnxnFactory
@volatile private var started = false
lazy val logDir = Files.createTempDirectory("zk-log").toFile
lazy val snapshotDir = Files.createTempDirectory("zk-snapshots").toFile
lazy val (zkServer, zkFactory) = {
val srv = new ZooKeeperServer(
snapshotDir, logDir, 500
)
val config = com.typesafe.config.ConfigFactory.
load(this.getClass.getClassLoader)
val port = config.getInt("test.zookeeper.port")
srv -> ServerCnxnFactory.createFactory(
new InetSocketAddress("localhost", port), 1024
)
}
def start(): Unit = if (!zkServer.isRunning) {
try {
zkFactory.startup(zkServer)
started = true
while (!zkServer.isRunning) {
Thread.sleep(500)
}
} catch {
case err: Throwable =>
println(s"fails to start ZooKeeper: ${err.getMessage}")
throw err
}
}
def stop(): Unit = try {
if (started) zkFactory.shutdown()
} finally {
try { FileUtil.fullyDelete(logDir) } catch { case _: Throwable => () }
FileUtil.fullyDelete(snapshotDir)
}
}
Run Code Online (Sandbox Code Playgroud)
测试类可以extends Kafka with ZooKeeper确保这一点可用。
如果测试JVM没有fork,Tests.Cleanup在SBTtestOptions in Test设置中可以用来在测试后停止嵌入式服务。
| 归档时间: |
|
| 查看次数: |
6416 次 |
| 最近记录: |