Chr*_*ian 3 python etl pandas airflow airflow-scheduler
我喜欢气流的想法,但我停留在基础知识上。从昨天开始,我在 vm ubuntu-postgres 解决方案上运行了气流。我可以看到仪表板和示例数据:))我现在想要的是将用于处理原始数据的示例脚本迁移到准备好的数据。
假设你有一个 csv 文件的文件夹。今天我的脚本迭代它,将每个文件传递到一个将被转换为 df.txt 的列表。之后,我准备它们的列名称并进行一些数据清理并将其写入不同的格式。
1:目录中文件的pd.read_csv
2:创建一个df
3:干净的列名
4:干净的值(与 stp 3 并行)
5:将结果写入数据库
我该如何根据气流组织我的文件?脚本应该是什么样子?我是否传递单个方法、单个文件,还是必须为每个部分创建多个文件?我现在缺乏基本概念:(我读到的关于气流的所有内容都比我的简单情况复杂得多。我正在考虑放弃气流以及 Bonobo、Mara、Luigi,但我认为气流是值得的?!
我将使用PythonOperator,将整个代码放入 Python 函数中,创建一个 Airflow 任务,仅此而已。
如果需要拆分这些步骤,也可以将 csv 文件的加载和数据库写入放在一个函数中。所有这些都将放入一个 DAG 中。
因此,您的一个 DAG 将承担三项任务,例如:
loadCSV (PythonOperator)
parseDF (PythonOperator)
pushToDB (PythonOperator)
Run Code Online (Sandbox Code Playgroud)
如果您使用多个任务,则需要使用Airflow 的 XCom。一开始只使用一个任务会更容易。
标签气流下有几个代码示例。当你创造了一些东西后,再问一次。
对于仍然困扰这个问题的人,我们最近为气流实现了一个自定义的 XCom 后端,由Vineyard支持,以支持此类情况。
该提供程序是开源的: https: //github.com/v6d-io/v6d/tree/main/python/vineyard/contrib/airflow
通过 Vineyard XCom 后端,用户可以pandas.DataFrame直接生成和消费 dag,而无需任何“to_csv”+“from_csv”黑客攻击,
import numpy as np
import pandas as pd
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_etl_pandas():
@task()
def extract():
order_data_dict = pd.DataFrame({
'a': np.random.rand(100000),
'b': np.random.rand(100000),
})
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
return {"total_order_value": order_data_dict["a"].sum()}
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
taskflow_etl_pandas_dag = taskflow_etl_pandas()
Run Code Online (Sandbox Code Playgroud)
希望对您的情况有所帮助。