xiu*_*shi 0 google-cloud-platform google-cloud-dataflow
我正在尝试优化从PubSubIO中提取消息的管道,并将这些消息发送到第三方API.我有一个有趣的观察结果是,如果我在之后放置一个GroupBy"Degroup"变换PubSubIO.read,那么管道的吞吐量会显着增加.我添加了GroupByjust以防止融合优化,现在我想知道在给定的管道中如何合并转换.
融合后如何找出管道的最佳方法是什么?
您可以通过调用project.locations.jobs.get或通过运行以下命令通过gcloud 访问优化图和融合阶段:
gcloud dataflow jobs describe --full $JOB_ID --format json
Run Code Online (Sandbox Code Playgroud)
从响应的输出,融合阶段将在ComponentTransform数组中的ExecutionStageSummary对象下描述.以下是Cloud Pub/Sub到BigQuery Google提供的模板的示例输出.在这种情况下,我们可以看到图形融合为3个步骤,主要由BigQueryIO接收器中的步骤描绘:Reshuffle
ReshuffleWriteSuccessfulRecordsWriteFailedRecordsReshuffle中WriteSuccessfulRecordsReshuffle中WriteFailedRecords由于作业描述非常详细,因此您可以考虑使用输出管道jq来轻松地在单行命令中提取相关位,如下所示:
gcloud dataflow jobs describe --full $JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage[] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
247 次 |
| 最近记录: |