我刚刚开始学习Airflow,但是掌握Xcom的概念是相当困难的。因此我写了这样的达格:
from airflow import DAG
from airflow.utils.edgemodifier import Label
from datetime import datetime
from datetime import timedelta
from airflow.operators.bash import BashOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
#For more default argument for a task (or creating templates), please check this website
#https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/index.html#airflow.models.BaseOperator
default_args = {
'owner': '...',
'email': ['...'],
'email_on_retry': False,
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2021, 6, 10, 23, 0, 0, 0),
}
hook = SSHHook(
remote_host='...',
username='...',
password='...## Heading ##',
port=22,
)
with DAG( …Run Code Online (Sandbox Code Playgroud) 我正在尝试寻找一种方法来管理 Airflow 中创建的外部连接的连接池。
Airflow 版本:2.1.0
Python 版本:3.9.5
Airflow DB:SQLite
创建的外部连接:MySQL 和 Snowflake
我知道airflow.cfg 文件中有属性
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
Run Code Online (Sandbox Code Playgroud)
但这些属性用于管理气流内部数据库,在我的例子中是 SQLite。
我有一些在 MySQL 和 Snowflake 中读取或写入数据的任务。
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="Some Insert query",
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE
)
Run Code Online (Sandbox Code Playgroud)
和
insert_mysql_task = MySqlOperator(task_id='insert_record', mysql_conn_id='mysql_default', sql="some insert query", dag=dag)
Run Code Online (Sandbox Code Playgroud)
从 MySQL 读取数据
def get_records():
mysql_hook = MySqlHook(mysql_conn_id="mysql_default")
records = mysql_hook.get_records(sql=r"""Some select query""")
print(records)
Run Code Online (Sandbox Code Playgroud)
我观察到的是,Snowflake 的每个任务(同一个 dag 中有多个任务)都会创建一个新会话,但尚未验证 MySQL 的情况是否相同。
有没有办法维护外部连接的连接池(在我的例子中是 Snowflake 和 MySQL)或任何其他方式来在同一会话中的同一 DAG 中运行所有查询?
谢谢
python mysql snowflake-cloud-data-platform airflow-scheduler airflow-2.x
我有一个任务,通过它我可以写入数据库,哪些任务已成功处理。我正在尝试从我的 python 操作员那里获取信息TaskInstance.current_status()。但是,我似乎找不到成功获取 TaskInstance 的方法。如何获取任务实例并传递给TaskInstance()?我试过了task_id,但似乎不能是字符串
正如您在代码示例中看到的,我试图通过 Airflow DAG 任务将 session_parameters 传递到我的 Snowflake 连接,但该参数没有被拾取,有什么解决方案吗?
task = SnowflakeOperator(
task_id='Task',
sql="CREATE OR REPLACE TABLE MY_DB.MY_SCHEMA.MY_TABLE (test VARCHAR)",
session_parameters={
"QUERY_TAG": "my_tag"
},
snowflake_conn_id="snowflake_connection",
warehouse="MY_WH",
dag=dag,
)
Run Code Online (Sandbox Code Playgroud) def get_latest_file_movement(**kwargs):
get_last_modified = lambda obj: int(obj['LastModified'].strftime('%s'))
s3 = boto3.client('s3')
objs = s3.list_objects_v2(Bucket='my-bucket',Prefix='prefix')['Contents']
last_added = [obj['Key'] for obj in sorted(objs, key=get_last_modified, reverse=True)][0]
return last_added
Run Code Online (Sandbox Code Playgroud)
上面的代码为我提供了最新的文件,但我只想要以“csv”结尾的文件
我正在使用 Apache 气流,但我对预定日期和开始日期有疑问。
我希望 dag 每天在 8:00 AM UTC 运行。所以,我所做的是:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 12, 7, 10, 0,0),
'email': ['example@emaiil.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(hours=5)
}
#never run
dag = DAG(dag_id='id', default_args=default_args, schedule_interval='0 8 * * *',catchup=True)
Run Code Online (Sandbox Code Playgroud)
我上传 dag 的那天是 2020-12-07,我想在 2020-12-08 的 08:00:00 运行它
我将 start_date 设置在 2020-12-07 的 10:00:00 以避免在 2020-12-07 的 08:00:00 运行它,并且只在第二天触发它,但它不起作用。
然后我所做的是修改开始日期:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 12, 7, 7, 59,0),
'email': ['example@emaiil.com'], …Run Code Online (Sandbox Code Playgroud)