dat*_*ser 5 java streaming google-cloud-platform google-cloud-dataflow
我正在尝试通过将 DataflowPipelineOptions 中的 setUpdate 标志设置为 true 来更新 DataFlow 作业。我有一个函数可以检查具有名称的现有作业,如果该作业不存在,我将 setUpdate 标志设置为 false,否则设置为 true。这意味着对于第一次部署作业,setUpdate 标志设置为 false,并且所有后续部署都会将标志设置为 true。
options.setUpdate(jobExists(options));
Run Code Online (Sandbox Code Playgroud)
管道代码如下:
pipeline
.apply("Read", pubsubDownload)
.apply("Window", Window.into(FixedWindows.of(WINDOW_DURATION)))
.apply("Extract", ParDo.of(new Extract()))
.apply("Count", ApproximateUnique.perKey(0.06))
.apply("View As Map", View.asMap()); //<-- ****Fails here
Run Code Online (Sandbox Code Playgroud)
注意:我只是想测试更新后的标志,因此没有更改部署中代码的任何部分。只是想测试该标志是否能够用新工作取代旧工作。
然而这不起作用。我第一次就能成功部署该作业。当我尝试再次重新部署作业时,出现以下错误:
工作流程失败。原因: (9a8ccc4f2e36c2d6):新作业与 <JOB_ID> 不兼容。原始作业尚未中止。, (9a8ccc4f2e36c371):阶段 View As Map/StreamingViewAsMap/Combine.Globally/Combine.PerKey/GroupByKey 已以与更新不兼容的方式更改。
View.asMap函数不可更新吗?根据文档,SideInput是可更新的。正如错误所述,如果View不可更新并且SideInput需要View,则文档包含错误信息。
请注意,这无需View.asMap管道步骤即可工作。
归档时间: |
|
查看次数: |
1446 次 |
最近记录: |