Tho*_*mas 1 integration-testing scala apache-kafka apache-flink embedded-kafka
我想用Flink 和 Kafka 运行集成测试。该过程是从 Kafka 读取数据,使用 Flink 进行一些操作,然后将数据流放入 Kafka 中。
我想从头到尾测试这个过程。现在我使用scalatest-embedded-kafka。
我在这里举了一个例子,我试图尽可能简单:
import java.util.Properties
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.scalatest.{Matchers, WordSpec}
import scala.collection.mutable.ListBuffer
object SimpleFlinkKafkaTest {
class CollectSink extends SinkFunction[String] {
override def invoke(string: String): Unit = {
synchronized {
CollectSink.values += string
}
}
}
object CollectSink {
val values: ListBuffer[String] = ListBuffer.empty[String]
}
val kafkaPort = 9092
val zooKeeperPort = 2181
val props = new Properties()
props.put("bootstrap.servers", "localhost:" + kafkaPort.toString)
props.put("schema.registry.url", "localhost:" + zooKeeperPort.toString)
val inputString = "mystring"
val expectedString = "MYSTRING"
}
class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {
"runs with embedded kafka" should {
"work" in {
implicit val config = EmbeddedKafkaConfig(
kafkaPort = SimpleFlinkKafkaTest.kafkaPort,
zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort
)
withRunningKafka {
publishStringMessageToKafka("input-topic", SimpleFlinkKafkaTest.inputString)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val kafkaConsumer = new FlinkKafkaConsumer011(
"input-topic",
new SimpleStringSchema,
SimpleFlinkKafkaTest.props
)
implicit val typeInfo = TypeInformation.of(classOf[String])
val inputStream = env.addSource(kafkaConsumer)
val outputStream = inputStream.map(_.toUpperCase)
val kafkaProducer = new FlinkKafkaProducer011(
"output-topic",
new SimpleStringSchema(),
SimpleFlinkKafkaTest.props
)
outputStream.addSink(kafkaProducer)
env.execute()
consumeFirstStringMessageFrom("output-topic") shouldEqual SimpleFlinkKafkaTest.expectedString
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
我遇到了错误,所以我添加了该行implicit val typeInfo = TypeInformation.of(classOf[String]),但我真的不明白为什么我必须这样做。
目前这段代码不起作用,它运行时不会中断,但不会停止,也不会给出任何结果。
如果有人有任何想法吗?测试这种管道是更好的主意。
谢谢 !
编辑:添加env.execute()和更改错误。
小智 5
这是我想出的一个简单的解决方案。
这个想法是:
和工作原型:
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.scalatest.{Matchers, WordSpec}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {
"runs with embedded kafka on arbitrary available ports" should {
val env = StreamExecutionEnvironment.getExecutionEnvironment
"work" in {
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2182)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2182")
properties.setProperty("group.id", "test")
properties.setProperty("auto.offset.reset", "earliest")
val kafkaConsumer = new FlinkKafkaConsumer011[String]("input", new SimpleStringSchema(), properties)
val kafkaSink = new FlinkKafkaProducer011[String]("output", new SimpleStringSchema(), properties)
val stream = env
.addSource(kafkaConsumer)
.map(_.toUpperCase)
.addSink(kafkaSink)
withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
createCustomTopic("input")
createCustomTopic("output")
Future{env.execute()}
publishStringMessageToKafka("input", "Titi")
consumeFirstStringMessageFrom("output") shouldEqual "TITI"
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1852 次 |
| 最近记录: |