Tho*_*mas 7 google-cloud-dataflow
我正在编写一个从Kafka 0.8读取的自定义DataFlow无界数据源.我想使用DirectPipelineRunner在本地运行它.但是,我得到以下stackstrace:
Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)
Run Code Online (Sandbox Code Playgroud)
这是有道理的,因为我没有在任何时候为我的自定义源注册评估者.
阅读https://github.com/GoogleCloudPlatform/DataflowJavaSDK,似乎只有有限来源的评估者才能注册.为自定义无界源定义和注册求值程序的推荐方法是什么?
DirectPipelineRunner
目前仅在有界输入上运行。我们正在积极努力消除此限制,并预计很快就会发布。
同时,出于测试目的,您可以使用轻松将 any 转换UnboundedSource
为 a ,如下例所示:BoundedSource
withMaxNumRecords
UnboundedSource<String> unboundedSource = ...; // make a Kafka source
PCollection<String> boundedKafkaCollection =
p.apply(Read.from(unboundedSource).withMaxNumRecords(10));
Run Code Online (Sandbox Code Playgroud)
有关更多详细信息,请参阅GitHub 上的此问题。
另外,还有一些致力于贡献 Kafka 连接器的努力。您可能想通过我们的 GitHub 存储库与我们和其他贡献者讨论此事。
归档时间: |
|
查看次数: |
668 次 |
最近记录: |