I have a use-case where Im inside a for loop and need to populate fields in an HttpSensor task dynamically
I tried using this syntax:
Method 1 FAILED:
s = 'sensor_task_sd_{0}'.format(d)
sensor_task_sd = HttpSensor(
task_id=s,
http_conn_id='ss_api',
endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t),
request_params={'X-Requested-By': 'abc_123'},
response_check=lambda response: True if "FINISHED" in response.text else False,
poke_interval=10,
soft_fail=True,
timeout=600,
dag=dag_subdag,
)
Run Code Online (Sandbox Code Playgroud)
but it fails because in this line:
endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t)
Run Code Online (Sandbox Code Playgroud)
I cannot make the python string substitution with .format(t) work.
Instead if I hard code some value …
主要问题:我正在尝试创建 BigQuery 表(如果不存在)。
方法:使用 BigQueryTableSensor 检查表是否存在,并根据返回值,使用 BigQueryCreateEmptyTableOperator 创建或不创建新表。
问题:我无法使用 xcom 获取 BigQueryTableSensor 传感器的返回值。众所周知,poke 方法需要返回一个布尔值。
这就是我创建任务的方式:
check_if_table_exists = BigQueryTableSensor(
task_id='check_if_table_exists',
project_id='my_project',
dataset_id='my_dataset',
table_id='my_table',
bigquery_conn_id='bigquery_default',
timeout=120,
do_xcom_push=True,
)
# Output: INFO - Success criteria met. Exiting.
get_results = BashOperator(
task_id='get_results',
bash_command="echo {{ ti.xcom_pull(task_ids='check_if_table_exists') }}"
)
# Output: INFO - Running command: echo None
Run Code Online (Sandbox Code Playgroud)
查看 Airflow 界面,我检查了 BigQueryTableSensor 没有推送任何内容:(
题:
有没有办法获得我的传感器的返回值?
有没有更好的方法来解决我的主要问题?也许使用 BigQueryOperator 和像“CREATE TABLE IF NOT EXISTS”这样的 sql 查询。
我正在使用 Airflow 在 Google Cloud Composer 上运行 Spark 作业。我需要
使用 Airflow API - 我可以读取 YAML 文件,并使用 xcom 跨任务推送变量。
但是,考虑 DataprocClusterCreateOperator()
cluster_nameproject_idzone和其他一些参数被标记为模板化。
如果我想将其他参数作为模板传递(目前不是这样)怎么办?-样image_version,
num_workers,worker_machine_type等?
有什么解决方法吗?
google-cloud-platform airflow google-cloud-composer apache-airflow-xcom
我需要在另一个任务中使用 oracleOperator 的输出以进一步执行。我遇到的麻烦是,当我将数据拉入另一个任务并打印它时,它给出的结果为 None。没有抛出错误,但没有传递数据。此外,任务 UI 中的xcom选项卡的键和值显示为空白。
我的代码如下:
from airflow import DAG
from airflow.operators.oracle_operator import OracleOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'xyz',
'start_date': days_ago(2),
}
dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])
def puller(**kwargs):
ti = kwargs['ti']
# get value_1
pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
print("VALUE IN PULLER ")
print(pulled_value_1)
pull = PythonOperator(
task_id='pullee',
dag=dag,
python_callable=puller,
provide_context=True,
)
push = OracleOperator(
task_id='data',
sql='SELECT * FROM CUSTOMERS',
oracle_conn_id='1',
provide_context=True,
dag=dag,
)
push>>pull
Run Code Online (Sandbox Code Playgroud) 我们有一个 kubernetes pod 操作符,它将输出一个 Python 字典,该字典将定义哪些更进一步的下游 kubernetes pod 操作符与它们的依赖项和环境变量一起运行,以传递给每个操作符。
我如何让这个 python 字典对象回到执行器的上下文(或者它是工作器的上下文?),以便气流可以产生下游 kubernetes 操作符?
我看过 BranchOperator 和 TriggerDagRunOperator 和 XCOM push/pull 和 Variable.get 和 Variable.set,但似乎没有什么工作。
我正在尝试使用 Airflow 实现基本的 ETL 作业,但有一点:
我有3个功能。我想为每个变量定义全局变量,例如:
function a():
return a_result
function b():
use a
return b_result
function c():
use a and b
Run Code Online (Sandbox Code Playgroud)
然后在python_callable.
像往常一样定义global a_result是行不通的。任何解决方案?
global-variables airflow airflow-scheduler apache-airflow-xcom
我是 Airflow 的 xcom 功能的新手。我用 PythonOperator 进行了尝试,它工作正常(即,我可以将值从上下文中推入和拉出),但是当我在 BashOperator 上尝试时,它不起作用。但是,我只能通过在任务创建期间添加 xcom_push=True 属性来提取最终的 stdout 语句。这是一回事。2)但我也希望像我们在 PythonOp 中那样根据它们的键(到 BashOp 和从 BashOp)推送和拉取值。这真的很有帮助,因为我需要将大量变量从一个脚本传递到其他。
我的任务是创建一个 POC,人们可以在其中从数据库中获取数据、处理它并将其发送到 S3。我仍在学习 Airflow,有些事情我还没有完全理解。我希望你能帮助我。所以我现在正在尝试从 mysql db 中获取数据并将其打印出来。问题是我总是得到“无”返回。我正在尝试使用 xcom 但它不起作用,我读到我不应该为此使用 xcom。这是我现在所拥有的:
def print_query():
query = ti.xcom_pull(task_ids='mysql_query')
print(query)
default_args = {
'owner': 'me',
'start_date': dt.datetime(2019, 8, 15),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
dag = DAG('s3_dag_test', default_args=default_args)
python = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_query,
dag=dag)
query = MySqlOperator(
task_id='mysql_query',
sql='SELECT * FROM sakila.actor',
mysql_conn_id='mysql_db',
dag=dag)
query >> python
Run Code Online (Sandbox Code Playgroud)
第一个任务运行良好(mysql_query),但是第二个任务失败,因为我没有。怎么了?
另外,我得到的实际错误是这样的:
ERROR - print_query() got an unexpected keyword argument 'dag'
Run Code Online (Sandbox Code Playgroud)
有任何想法吗?
我正在尝试从外部 API 中提取数据并将其转储到 S3 上。我正在考虑编写 Airflow Operator rest-to-s3.py,它会从外部 Rest API 中提取数据。
我的担忧是:
我对 Airflow 相当陌生,所以它会有所帮助。
python airflow airflow-scheduler apache-airflow-xcom airflow-operator