标签: apache-airflow-xcom

气流 k8s 运营商 xcom - 握手状态 403 禁止

当我在 Airflow 1.10 版中使用KubernetesPodOperator运行docker 镜像

一旦 pod 成功完成任务,airflow 会尝试通过 k8s 流客户端连接到 pod 来获取 xcom 值。

以下是我遇到的错误:

[2018-12-18 05:29:02,209] {{models.py:1760}} ERROR - (0)
Reason: Handshake status 403 Forbidden
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kubernetes/stream/ws_client.py", line 249, in websocket_call
    client = WSClient(configuration, get_websocket_url(url), headers)
  File "/usr/local/lib/python3.6/site-packages/kubernetes/stream/ws_client.py", line 72, in __init__
    self.sock.connect(url, header=header)
  File "/usr/local/lib/python3.6/site-packages/websocket/_core.py", line 223, in connect
    self.handshake_response = handshake(self.sock, *addrs, **options)
  File "/usr/local/lib/python3.6/site-packages/websocket/_handshake.py", line 79, in handshake
    status, resp = _get_resp_headers(sock)
  File "/usr/local/lib/python3.6/site-packages/websocket/_handshake.py", line 152, …
Run Code Online (Sandbox Code Playgroud)

kubernetes airflow kubernetes-python-client apache-airflow-xcom

8
推荐指数
1
解决办法
1685
查看次数

KeyError:Apache Airflow xcom中的'ti'

我们正在尝试运行一个包含2个任务的简单DAG,这些任务将通过xcom进行数据通信.

DAG文件:

from __future__ import print_function
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    'example_xcom',
    schedule_interval="@once",
    default_args=args)

value_1 = [1, 2, 3]


def push(**kwargs):
    # pushes an XCom without a specific target
    kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)


def puller(**kwargs):
    ti = kwargs['ti']

    v1 = ti.xcom_pull(key=None, task_ids='push')
    assert v1 == value_1

    v1 = ti.xcom_pull(key=None, task_ids=['push'])
    assert (v1) == (value_1)


push1 = PythonOperator(
    task_id='push', dag=dag, …
Run Code Online (Sandbox Code Playgroud)

airflow apache-airflow-xcom

7
推荐指数
1
解决办法
2361
查看次数

在气流中创建子标签时访问父 dag 上下文?

我试图在 subdag 创建时访问来自父 dag 的一些 xcom 数据,我正在寻找在互联网上实现这一目标,但我没有找到任何东西。

def test(task_id):
    logging.info(f' execution of task {task_id}')


def load_subdag(parent_dag_id, child_dag_id, args):
    dag_subdag = DAG(
        dag_id='{0}.{1}'.format(parent_dag_id, child_dag_id),
        default_args=args,
        schedule_interval="@daily",
    )
    with dag_subdag:
        r = DummyOperator(task_id='random')

        for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
            t = PythonOperator(
                task_id='load_subdag_{0}'.format(i),
                default_args=args,
                python_callable=print_context,
                op_kwargs={'task_id': 'load_subdag_{0}'.format(i)},
                dag=dag_subdag,
            )

    return dag_subdag

load_tasks = SubDagOperator(
        task_id='load_tasks',
        subdag=load_subdag(dag.dag_id,
                           'load_tasks', args),
        default_args=args,
    )
Run Code Online (Sandbox Code Playgroud)

我的代码出现此错误

1  | Traceback (most recent call last):
airflow_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
airflow_1  |     m = imp.load_source(mod_name, filepath) …
Run Code Online (Sandbox Code Playgroud)

airflow apache-airflow-xcom

6
推荐指数
1
解决办法
5230
查看次数

气流 - 如何仅“填充 DagBag”一次

我的 dag 需要大约 50 秒来解析,我只使用外部触发器来启动 dag 运行,没有时间表。我注意到气流想要大量填充 dagbag --> 在每个 trigger_dag 命令和在后台,它不断检查 dags 文件夹并在部署新的 .py 后似乎立即创建 .pyc 文件。

无论如何我可以部署我的集群并填充一次 dags!然后在接下来的 2 周内,在任何 trigger_dag 上立即开始 dagruns(现在需要 50 秒才能在开始之前填充 dagbag)。我不需要在 2 周内更新 dag 定义。

orchestration airflow airflow-scheduler apache-airflow-xcom

6
推荐指数
1
解决办法
4023
查看次数

基于事件触发和运行气流任务将文件放入 S3 存储桶

是否可以仅在发生特定事件(例如将文件放入特定 S3 存储桶的事件)时运行气流任务。类似于 AWS Lambda 事件的内容

有,S3KeySensor但我不知道它是否符合我的要求(仅在事件发生时运行 Task)

这是使问题更清楚的示例:

我有一个传感器对象如下

sensor = S3KeySensor(
    task_id='run_on_every_file_drop',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='my-sensor-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

使用上述传感器对象,传感器任务的气流行为如下:

  • 如果在气流管理 UI 中my-sensor-bucket切换 DAG 之前已经存在与 S3 存储桶中的通配符匹配的对象名称ON,则运行该任务(由于过去的 s3 对象的存在,我不想运行该任务)
  • 运行一次后,每当有新的 S3 文件对象删除时,传感器任务将不会再次运行(我想每次在存储桶中删除一个新的 S3 文件对象时,都运行 DAG 中的传感器任务和后续任务my-sensor-bucket
  • 如果您配置调度程序,任务将基于调度而不是基于事件运行。所以在这种情况下调度程序似乎不是一个选项

我试图了解气流中的任务是否只能基于调度(如 cron 作业)或传感器(仅基于传感标准一次)运行,还是不能像基于事件的管道(类似于 AWS Lambda 的东西)那样设置

amazon-s3 airflow airflow-scheduler apache-airflow-xcom airflow-operator

6
推荐指数
1
解决办法
3433
查看次数

如何获得手动触发 DAG 的 Airflow 用户?

在 Airflow UI 中,“浏览器 > 日志”下可用的日志事件之一是事件“触发器”以及负责触发此事件的 DAG ID 和所有者/用户。这些信息是否可以通过编程轻松获取?

用例是,我有一个 DAG,它允许一部分用户手动触发执行。根据触发此 DAG 执行的用户,从此 DAG 执行代码的行为会有所不同。

先感谢您。

airflow airflow-scheduler apache-airflow-xcom airflow-operator

6
推荐指数
1
解决办法
995
查看次数

无法从气流 Pod 中提取 xcom - Kubernetes Pod Operator

在运行使用
docker镜像运行 jar 的 DAG 时,给出了xcom_push=True,它会在单个 pod 中创建另一个容器以及docker镜像。

达格:

jar_task = KubernetesPodOperator(
    namespace='test',
    image="path to image",
    image_pull_secrets="secret",
    image_pull_policy="Always",
    node_selectors={"d-type":"na-node-group"},
    cmds=["sh","-c",..~running jar here~..],
    secrets=[secret_file],
    env_vars=environment_vars,
    labels={"k8s-app": "airflow"},
    name="airflow-pod",
    config_file=k8s_config_file,
    resources=pod.Resources(request_cpu=0.2,limit_cpu=0.5,request_memory='512Mi',limit_memory='1536Mi'),
    in_cluster=False,
    task_id="run_jar",
    is_delete_operator_pod=True,
    get_logs=True,
    xcom_push=True,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

下面是JAR执行成功时的错误..

    [2018-11-27 11:37:21,605] {{logging_mixin.py:95}} INFO - [2018-11-27 11:37:21,605] {{pod_launcher.py:166}} INFO - Running command... cat /airflow/xcom/return.json
    [2018-11-27 11:37:21,605] {{logging_mixin.py:95}} INFO - 
    [2018-11-27 11:37:21,647] {{logging_mixin.py:95}} INFO - [2018-11-27 11:37:21,646] {{pod_launcher.py:173}} INFO - cat: can't open '/airflow/xcom/return.json': No such file or directory
    [2018-11-27 …
Run Code Online (Sandbox Code Playgroud)

python kubernetes airflow apache-airflow-xcom

5
推荐指数
2
解决办法
5217
查看次数

Airflow xcom_pull 不提供相同上游任务实例运行的数据,而是提供最新数据

我正在创建一个 Airflow @daily DAG,它有一个get_daily_dataBigQueryGetDataOperator的上游任务,它根据 execution_date 和下游依赖任务(PythonOperator)获取数据,通过 xcom_pull 使用以上基于日期的数据。当我运行气流回填命令时,process_data_from_bq我正在执行 xcom_pull的下游任务,它只获取最近的数据,而不是下游任务期望的同一执行日期的数据。Airfow 文档说如果我们传递如果 xcom_pull 为 task_ids 传递单个字符串,则返回来自该任务的最新 XCom 值 但是它没有说明如何获取 DAG 执行的同一实例的数据。

我经历了一个相同的问题如何在同一个 DAG 运行中从其他任务实例中提取 xcom 值(不是最近的一个)?然而,那里给出的一个解决方案是我已经在做的。但似乎它不是正确的答案。

DAG 定义:

dag = DAG(
    'daily_motor',
    default_args=default_args,
    schedule_interval='@daily'
)

#This task creates data in a BigQuery table based on execution date
extract_daily_data  = BigQueryOperator(
    task_id='daily_data_extract',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    sql=policy_by_transaction_date_sql('{{ ds }}'), 
    destination_dataset_table='Test.daily_data_tmp',
    dag=dag)


get_daily_data = BigQueryGetDataOperator(
    task_id='get_daily_data',
    dataset_id='Test',
    table_id='daily_data_tmp',
    max_results='10000',
    dag=dag

)


#This is where I need to pull …
Run Code Online (Sandbox Code Playgroud)

python airflow apache-airflow-xcom

5
推荐指数
1
解决办法
1940
查看次数

气流传感器重新安排运行时的持续状态

我正在BaseSensorOperatorAirflow 中工作。我有一个用例,我希望BaseSensorOperator.poke(context)函数将一些信息传递给poke. 我尝试使用 Xcom 如下(通过无意义值的模拟案例):

    def poke(self, context):
        task_instance = context['task_instance']
        old_value = task_instance.xcom_pull(key='passing_this_value')
        if old_value:
            logging.info(f'retrieved from Xcom {old_value}')
        else:
            logging.info('no value was retrieved')
        new_value = datetime.now()
        logging.info(f'sending this value to Xcom {new_value}')
        task_instance.xcom_push(key='passing_this_value', value=new_value)

        if new_value.minute % 10 == 0:
            return True
        else:
            return False
Run Code Online (Sandbox Code Playgroud)

这在创建传感器任务时效果很好,method='poke'method='reschedule'由于在重新调度时为该运行中的任务清除了 Xcom ,因此失败。

有办法解决吗?我可以使用,Variable但这会极大地破坏可变空间。还有什么建议吗?

airflow apache-airflow-xcom

5
推荐指数
1
解决办法
513
查看次数

从一个 Airflow DAG 返回值到另一个

我的 DAG(我们称之为 DAG_A)使用trigger_dagrun操作符启动另一个 DAG (DAG_B) 。DAG_B 的任务使用 XCOM,我想在完成后从 DAG_B 运行的任务之一(正是我开始的任务)中获取 XCOM 值。

使用 XCOM 不是硬性要求 - 基本上 Airflow 本身提供的任何(合理的)机制都可以工作。如果需要,我可以更改 DAG_B。

找不到此类案例的任何示例,因此感谢您的帮助。

计划 B 是让 DAG_B 将 XCOM 值与一些运行 ID 一起保存到一些持久性存储(如 DB 或文件)中,DAG_A 将从那里获取它。但如果有一些内置机制可用,我想避免这种复杂化。

python airflow apache-airflow-xcom

5
推荐指数
1
解决办法
481
查看次数