Dee*_*mal 5 python kubernetes airflow apache-airflow-xcom
在运行使用
docker镜像运行 jar 的 DAG 时,给出了xcom_push=True,它会在单个 pod 中创建另一个容器以及docker镜像。
达格:
jar_task = KubernetesPodOperator(
namespace='test',
image="path to image",
image_pull_secrets="secret",
image_pull_policy="Always",
node_selectors={"d-type":"na-node-group"},
cmds=["sh","-c",..~running jar here~..],
secrets=[secret_file],
env_vars=environment_vars,
labels={"k8s-app": "airflow"},
name="airflow-pod",
config_file=k8s_config_file,
resources=pod.Resources(request_cpu=0.2,limit_cpu=0.5,request_memory='512Mi',limit_memory='1536Mi'),
in_cluster=False,
task_id="run_jar",
is_delete_operator_pod=True,
get_logs=True,
xcom_push=True,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
下面是JAR执行成功时的错误..
[2018-11-27 11:37:21,605] {{logging_mixin.py:95}} INFO - [2018-11-27 11:37:21,605] {{pod_launcher.py:166}} INFO - Running command... cat /airflow/xcom/return.json
[2018-11-27 11:37:21,605] {{logging_mixin.py:95}} INFO -
[2018-11-27 11:37:21,647] {{logging_mixin.py:95}} INFO - [2018-11-27 11:37:21,646] {{pod_launcher.py:173}} INFO - cat: can't open '/airflow/xcom/return.json': No such file or directory
[2018-11-27 11:37:21,647] {{logging_mixin.py:95}} INFO -
[2018-11-27 11:37:21,647] {{logging_mixin.py:95}} INFO - [2018-11-27 11:37:21,647] {{pod_launcher.py:166}} INFO - Running command... kill -s SIGINT 1
[2018-11-27 11:37:21,647] {{logging_mixin.py:95}} INFO -
[2018-11-27 11:37:21,702] {{models.py:1760}} ERROR - Pod Launching failed: Failed to extract xcom from pod: airflow-pod-hippogriff-a4628b12
Traceback (most recent call last):
File "/usr/local/airflow/operators/kubernetes_pod_operator.py", line 126, in execute
get_logs=self.get_logs)
File "/usr/local/airflow/operators/pod_launcher.py", line 90, in run_pod
return self._monitor_pod(pod, get_logs)
File "/usr/local/airflow/operators/pod_launcher.py", line 110, in _monitor_pod
result = self._extract_xcom(pod)
File "/usr/local/airflow/operators/pod_launcher.py", line 161, in _extract_xcom
raise AirflowException('Failed to extract xcom from pod: {}'.format(pod.name))
airflow.exceptions.AirflowException: Failed to extract xcom from pod: airflow-pod-hippogriff-a4628b12
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/airflow/operators/kubernetes_pod_operator.py", line 138, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Failed to extract xcom from pod: airflow-pod-hippogriff-a4628b12
[2018-11-27 11:37:21,704] {{models.py:1789}} INFO - All retries failed; marking task as FAILED
Run Code Online (Sandbox Code Playgroud)
小智 8
如果xcom_push
为 True,则在 Pod 中与基础容器(实际工作容器)一起KubernetesPodOperator
创建另一个边车容器 ( airflow-xcom-sidecar
)。这个 sidecar 容器从中读取数据/airflow/xcom/return.json
并作为 xcom 值返回。因此,在您的基本容器中,您需要将要返回的数据写入/airflow/xcom/return.json
文件。
我想指出我在 xcom 方面遇到的错误,KubernetesPodOperator
尽管它与 OP 的原因不同。以防万一有人偶然发现这个问题,因为这是唯一一个关于 KPO 和 XCom 的问题。
我正在使用 Google Cloud Platform (GCP) Cloud Composer,它使用的版本比最新的 Airflow 版本稍旧,因此当我引用官方 GitHub 时,它提到使用,do_xcom_push
而旧的 Airflow 使用 argxcom_push
代替!
归档时间: |
|
查看次数: |
5217 次 |
最近记录: |