标签: apache-airflow-xcom

Using python string substitution and xcom_pull in Airflow HttpSensor operator

I have a use-case where Im inside a for loop and need to populate fields in an HttpSensor task dynamically

I tried using this syntax:

Method 1 FAILED:

s = 'sensor_task_sd_{0}'.format(d)
            sensor_task_sd = HttpSensor(
                task_id=s,
                http_conn_id='ss_api',
                endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t),
                request_params={'X-Requested-By': 'abc_123'},
                response_check=lambda response: True if "FINISHED" in response.text else False,
                poke_interval=10,
                soft_fail=True,
                timeout=600,
                dag=dag_subdag,
                )
Run Code Online (Sandbox Code Playgroud)

but it fails because in this line:

endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t)
Run Code Online (Sandbox Code Playgroud)

I cannot make the python string substitution with .format(t) work.

Instead if I hard code some value …

airflow apache-airflow-xcom

4
推荐指数
1
解决办法
2267
查看次数

如何从气流传感器中提取 xcom 值?

主要问题:我正在尝试创建 BigQuery 表(如果不存在)。

方法:使用 BigQueryTableSensor 检查表是否存在,并根据返回值,使用 BigQueryCreateEmptyTableOperator 创建或不创建新表。

问题:我无法使用 xcom 获取 BigQueryTableSensor 传感器的返回值。众所周知,poke 方法需要返回一个布尔值。

这就是我创建任务的方式:

check_if_table_exists = BigQueryTableSensor(
        task_id='check_if_table_exists',
        project_id='my_project',
        dataset_id='my_dataset',
        table_id='my_table',
        bigquery_conn_id='bigquery_default',
        timeout=120,
        do_xcom_push=True,
    )

# Output: INFO - Success criteria met. Exiting.

get_results = BashOperator(
        task_id='get_results',
        bash_command="echo {{ ti.xcom_pull(task_ids='check_if_table_exists') }}"
    )

# Output: INFO - Running command: echo None
Run Code Online (Sandbox Code Playgroud)

查看 Airflow 界面,我检查了 BigQueryTableSensor 没有推送任何内容:(

在此处输入图片说明

题:

  • 有没有办法获得我的传感器的返回值?

  • 有没有更好的方法来解决我的主要问题?也许使用 BigQueryOperator 和像“CREATE TABLE IF NOT EXISTS”这样的 sql 查询。

python airflow apache-airflow-xcom

4
推荐指数
2
解决办法
427
查看次数

如何传递动态参数 Airflow 运算符?

我正在使用 Airflow 在 Google Cloud Composer 上运行 Spark 作业。我需要

  • 创建集群(用户提供的 YAML 参数)
  • spark 作业列表(作业参数也由每个作业 YAML 提供)

使用 Airflow API - 我可以读取 YAML 文件,并使用 xcom 跨任务推送变量。

但是,考虑 DataprocClusterCreateOperator()

  • cluster_name
  • project_id
  • zone

和其他一些参数被标记为模板化。

如果我想将其他参数作为模板传递(目前不是这样)怎么办?-样image_versionnum_workersworker_machine_type等?

有什么解决方法吗?

google-cloud-platform airflow google-cloud-composer apache-airflow-xcom

3
推荐指数
1
解决办法
2762
查看次数

将 OracleOperator 的输出发送到 Airflow 中的另一个任务

我需要在另一个任务中使用 oracleOperator 的输出以进一步执行。我遇到的麻烦是,当我将数据拉入另一个任务并打印它时,它给出的结果为 None。没有抛出错误,但没有传递数据。此外,任务 UI 中的xcom选项卡的键和值显示为空白。

我的代码如下:

from airflow import DAG
from airflow.operators.oracle_operator import OracleOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'xyz',
    'start_date': days_ago(2),
}

dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])


def puller(**kwargs):
    ti = kwargs['ti']
    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    print("VALUE IN PULLER ")
    print(pulled_value_1)

pull = PythonOperator(
    task_id='pullee',
    dag=dag,
    python_callable=puller,
    provide_context=True,
)
push = OracleOperator(
    task_id='data',
    sql='SELECT * FROM CUSTOMERS', 
    oracle_conn_id='1',
    provide_context=True,
    dag=dag,
)


push>>pull
Run Code Online (Sandbox Code Playgroud)

airflow apache-airflow-xcom

3
推荐指数
1
解决办法
970
查看次数

Airflow:如何获取一个任务的返回输出来设置下游任务运行的依赖关系?

我们有一个 kubernetes pod 操作符,它将输出一个 Python 字典,该字典将定义哪些更进一步的下游 kubernetes pod 操作符与它们的依赖项和环境变量一起运行,以传递给每个操作符。

我如何让这个 python 字典对象回到执行器的上下文(或者它是工作器的上下文?),以便气流可以产生下游 kubernetes 操作符?

我看过 BranchOperator 和 TriggerDagRunOperator 和 XCOM push/pull 和 Variable.get 和 Variable.set,但似乎没有什么工作。

airflow apache-airflow-xcom

2
推荐指数
1
解决办法
1199
查看次数

Airflow 中的全局变量

我正在尝试使用 Airflow 实现基本的 ETL 作业,但有一点:

我有3个功能。我想为每个变量定义全局变量,例如:

function a():
   return a_result

function b():
     use a
     return b_result
function c():
     use a and b
Run Code Online (Sandbox Code Playgroud)

然后在python_callable.

像往常一样定义global a_result是行不通的。任何解决方案?

global-variables airflow airflow-scheduler apache-airflow-xcom

2
推荐指数
1
解决办法
6153
查看次数

我不能通过 BashOperator xcom_push 参数

我是 Airflow 的 xcom 功能的新手。我用 PythonOperator 进行了尝试,它工作正常(即,我可以将值从上下文中推入和拉出),但是当我在 BashOperator 上尝试时,它不起作用。但是,我只能通过在任务创建期间添加 xcom_push=True 属性来提取最终的 stdout 语句。这是一回事。2)但我也希望像我们在 PythonOp 中那样根据它们的键(到 BashOp 和从 BashOp)推送和拉取值。这真的很有帮助,因为我需要将大量变量从一个脚本传递到其他。

airflow apache-airflow-xcom

1
推荐指数
1
解决办法
5739
查看次数

气流从数据库中获取数据并打印出来

我的任务是创建一个 POC,人们可以在其中从数据库中获取数据、处理它并将其发送到 S3。我仍在学习 Airflow,有些事情我还没有完全理解。我希望你能帮助我。所以我现在正在尝试从 mysql db 中获取数据并将其打印出来。问题是我总是得到“无”返回。我正在尝试使用 xcom 但它不起作用,我读到我不应该为此使用 xcom。这是我现在所拥有的:

def print_query():
query = ti.xcom_pull(task_ids='mysql_query')
print(query)

default_args = {
    'owner': 'me',
    'start_date': dt.datetime(2019, 8, 15),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

dag = DAG('s3_dag_test', default_args=default_args)

python = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=print_query,
    dag=dag)

query = MySqlOperator(
    task_id='mysql_query',
    sql='SELECT * FROM sakila.actor',
    mysql_conn_id='mysql_db',
    dag=dag)

query >> python
Run Code Online (Sandbox Code Playgroud)

第一个任务运行良好(mysql_query),但是第二个任务失败,因为我没有。怎么了?

另外,我得到的实际错误是这样的:

ERROR - print_query() got an unexpected keyword argument 'dag'
Run Code Online (Sandbox Code Playgroud)

有任何想法吗?

airflow apache-airflow-xcom

0
推荐指数
1
解决办法
1703
查看次数

Airflow Operator 从外部 Rest API 中提取数据

我正在尝试从外部 API 中提取数据并将其转储到 S3 上。我正在考虑编写 Airflow Operator rest-to-s3.py,它会从外部 Rest API 中提取数据。

我的担忧是:

  1. 这将是一个长时间运行的任务,我如何跟踪失败?
  2. 有没有比编写运算符更好的选择?
  3. 是否建议执行可能运行几个小时的任务并等待它?

我对 Airflow 相当陌生,所以它会有所帮助。

python airflow airflow-scheduler apache-airflow-xcom airflow-operator

0
推荐指数
1
解决办法
1658
查看次数