小编Ela*_*lad的帖子

从 BashOperator 到 SSHOperator 的 Airflow XCOM 通信

我刚刚开始学习Airflow,但是掌握Xcom的概念是相当困难的。因此我写了这样的达格:

from airflow import DAG
from airflow.utils.edgemodifier import Label

from datetime import datetime
from datetime import timedelta

from airflow.operators.bash import BashOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook

#For more default argument for a task (or creating templates), please check this website
#https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/index.html#airflow.models.BaseOperator

default_args = {
    'owner': '...',
    'email': ['...'],
    'email_on_retry': False,
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2021, 6, 10, 23, 0, 0, 0),
    
}

hook = SSHHook(
    remote_host='...',
    username='...',
    password='...## Heading ##',
    port=22,
)

with DAG( …
Run Code Online (Sandbox Code Playgroud)

python airflow airflow-2.x

5
推荐指数
1
解决办法
9354
查看次数

Airflow 中外部连接的连接池

我正在尝试寻找一种方法来管理 Airflow 中创建的外部连接的连接池。
Airflow 版本:2.1.0
Python 版本:3.9.5
Airflow DB:SQLite
创建的外部连接:MySQL 和 Snowflake

我知道airflow.cfg 文件中有属性

sql_alchemy_pool_enabled = True  
sql_alchemy_pool_size = 5
Run Code Online (Sandbox Code Playgroud)

但这些属性用于管理气流内部数据库,在我的例子中是 SQLite。

我有一些在 MySQL 和 Snowflake 中读取或写入数据的任务。

snowflake_insert = SnowflakeOperator(
          task_id='insert_snowflake',
          dag=dag,
          snowflake_conn_id=SNOWFLAKE_CONN_ID,
          sql="Some Insert query",
          warehouse=SNOWFLAKE_WAREHOUSE,
          database=SNOWFLAKE_DATABASE,
          schema=SNOWFLAKE_SCHEMA,
          role=SNOWFLAKE_ROLE
     )
Run Code Online (Sandbox Code Playgroud)

insert_mysql_task = MySqlOperator(task_id='insert_record', mysql_conn_id='mysql_default', sql="some insert query", dag=dag)
Run Code Online (Sandbox Code Playgroud)

从 MySQL 读取数据

def get_records():
    mysql_hook = MySqlHook(mysql_conn_id="mysql_default")
    records = mysql_hook.get_records(sql=r"""Some select query""")
    print(records)
Run Code Online (Sandbox Code Playgroud)

我观察到的是,Snowflake 的每个任务(同一个 dag 中有多个任务)都会创建一个新会话,但尚未验证 MySQL 的情况是否相同。

有没有办法维护外部连接的连接池(在我的例子中是 Snowflake 和 MySQL)或任何其他方式来在同一会话中的同一 DAG 中运行所有查询?

谢谢

python mysql snowflake-cloud-data-platform airflow-scheduler airflow-2.x

5
推荐指数
1
解决办法
3091
查看次数

Airflow,获取当前 dag 运行中任务的当前状态

我有一个任务,通过它我可以写入数据库,哪些任务已成功处理。我正在尝试从我的 python 操作员那里获取信息TaskInstance.current_status()。但是,我似乎找不到成功获取 TaskInstance 的方法。如何获取任务实例并传递给TaskInstance()?我试过了task_id,但似乎不能是字符串

airflow

4
推荐指数
1
解决办法
2万
查看次数

气流通过 Snowflake session_parameters

正如您在代码示例中看到的,我试图通过 Airflow DAG 任务将 session_parameters 传递到我的 Snowflake 连接,但该参数没有被拾取,有什么解决方案吗?

task = SnowflakeOperator(
    task_id='Task',
    sql="CREATE OR REPLACE TABLE MY_DB.MY_SCHEMA.MY_TABLE (test VARCHAR)",
    session_parameters={
        "QUERY_TAG": "my_tag"
    },
    snowflake_conn_id="snowflake_connection",
    warehouse="MY_WH",
    dag=dag,
)
Run Code Online (Sandbox Code Playgroud)

python airflow snowflake-cloud-data-platform

4
推荐指数
1
解决办法
1538
查看次数

使用 boto3 列出 S3 中带有后缀的对象

def get_latest_file_movement(**kwargs):
    get_last_modified = lambda obj: int(obj['LastModified'].strftime('%s'))
    s3 = boto3.client('s3')
    objs = s3.list_objects_v2(Bucket='my-bucket',Prefix='prefix')['Contents']
    last_added = [obj['Key'] for obj in sorted(objs, key=get_last_modified, reverse=True)][0]
    return last_added
Run Code Online (Sandbox Code Playgroud)

上面的代码为我提供了最新的文件,但我只想要以“csv”结尾的文件

python boto3

4
推荐指数
1
解决办法
1万
查看次数

Apache 气流中的开始日期和预定日期问题

我正在使用 Apache 气流,但我对预定日期和开始日期有疑问。

我希望 dag 每天在 8:00 AM UTC 运行。所以,我所做的是:

default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2020, 12, 7, 10, 0,0),
        'email': ['example@emaiil.com'],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(hours=5)
    }
#never run
dag = DAG(dag_id='id', default_args=default_args, schedule_interval='0 8 * * *',catchup=True)
Run Code Online (Sandbox Code Playgroud)

我上传 dag 的那天是 2020-12-07,我想在 2020-12-08 的 08:00:00 运行它

我将 start_date 设置在 2020-12-07 的 10:00:00 以避免在 2020-12-07 的 08:00:00 运行它,并且只在第二天触发它,但它不起作用。

然后我所做的是修改开始日期:

default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2020, 12, 7, 7, 59,0),
        'email': ['example@emaiil.com'], …
Run Code Online (Sandbox Code Playgroud)

airflow airflow-scheduler

3
推荐指数
1
解决办法
678
查看次数