我有一个 DAG,它获取 Google 云存储桶中脚本的结果,将其加载到 Google BigQuery 中的表中,然后删除存储桶中的文件。
我希望 DAG 在周末每小时检查一次。现在,我正在使用 GoogleCloudStoragetoBigQueryOperator。如果该文件不存在,则 DAG 失败。有没有办法将 DAG 设置为即使文件不存在也不会失败?也许尝试/捕获?
您可以使用Google 提供程序包中的GCSObjectExistenceSensor来在运行下游任务之前验证文件是否存在。
\ngcs_object_exists = GCSObjectExistenceSensor(\n bucket=BUCKET_1,\n object=PATH_TO_UPLOAD_FILE,\n mode=\'poke\',\n task_id="gcs_object_exists_task",\n)\nRun Code Online (Sandbox Code Playgroud)\n您可以在此处查看官方示例。请记住,该传感器从 扩展而来,BaseSensorOperator因此您可以定义参数,例如poke_interval、timeout和mode来满足您的需求。
\n\n\n\n
\n- soft_fail (bool) \xe2\x80\x93 设置为 true 以在失败时将任务标记为“已跳过”
\n- poke_interval (float) \xe2\x80\x93 作业在每次尝试之间等待的时间(以秒为单位)
\n- timeout (float) \xe2\x80\x93 任务超时并失败之前的时间(以秒为单位)。
\n- mode (str) \xe2\x80\x93 传感器的工作方式。选项有: { 戳 | 重新安排},默认是poke。当设置为“poke”时,传感器在整个执行时间内占用一个工作槽,并在“poke”之间休眠。如果传感器的预期运行时间较短或需要较短的刺探间隔,请使用此模式。请注意,在此模式下,传感器将在sensor\xe2\x80\x99s 运行时期间保留工作插槽和池插槽。当设置为重新调度时,传感器任务会在尚未满足条件时释放工作插槽,并在稍后重新调度。如果预计满足条件之前的时间很长,请使用此模式。Poke 间隔应超过一分钟,以防止调度程序负载过大。
\n- exponential_backoff (bool) \xe2\x80\x93 通过使用指数退避算法允许在 poke 之间渐进地延长等待时间
\n