Chr*_*kos 8 java integration-testing google-cloud-pubsub google-cloud-dataflow apache-beam
我们正在为Apache Beam管道构建集成测试,并且遇到了一些问题.有关背景信息,请参见
有关我们管道的详情:
PubsubIO我们的数据源(无界PCollection)CombineFn和非常简单的窗口/触发策略JdbcIO,用org.neo4j.jdbc.Driver写的Neo4j目前的测试方法:
OurPipeline.main(TestPipeline.convertToArgs(options)PubsubIO将从这是一个简单的集成测试,它将验证我们的整个管道是否按预期运行.
我们目前面临的问题是,当我们运行我们的管道时,它会阻塞.我们正在使用DirectRunner和pipeline.run()(不 pipeline.run().waitUntilFinish()),但测试似乎在运行管道后挂起.因为这是一个无限制的PCollection(在流模式下运行),管道不会终止,因此不会到达它之后的任何代码.
所以,我有几个问题:
1)有没有办法运行管道然后稍后手动停止?
2)有没有办法异步运行管道?理想情况下,它会启动管道(然后将继续轮询Pub/Sub以获取数据),然后转到负责发布到Pub/Sub的代码.
3)这种集成测试方法是否合理,或者是否有更好的方法可能更直接?这里的任何信息/指导将不胜感激.
如果我能提供任何额外的代码/背景,请告诉我 - 谢谢!
Tho*_*roh 10
您可以使用DirectRunner by传递设置isBlockOnRun管道选项来异步运行管道false.只要你保留对返回的PipelineResult可用的引用,调用cancel()该结果就应该停止管道.
对于第三个问题,您的设置似乎合理.但是,如果您希望对管道进行较小规模的测试(需要较少的组件),则可以将所有处理逻辑封装在自定义中PTransform.这PTransform应该采用已经从输入源完全解析的输入,并产生尚未为输出接收器解析的输出.
当这样做时,可以使用任一Create(其通常将不行使触发)或TestStream(其可以,这取决于你如何构建TestStream)与DirectRunner,以产生输入数据的有限数量,应用该处理PTransform到该PCollection,并使用PAssert上输出PCollection以验证管道是否生成了您期望的输出.
有关测试的更多信息,Beam网站在编程指南中有关于这些测试样式的信息,以及关于测试管道的博客文章TestStream.
| 归档时间: |
|
| 查看次数: |
961 次 |
| 最近记录: |